[ 
https://issues.apache.org/jira/browse/NIFI-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088033#comment-18088033
 ] 

ASF subversion and git services commented on NIFI-16003:
--------------------------------------------------------

Commit 6f1722052b56c6ad6f932fcfa5d547b1848e0d9b in nifi's branch 
refs/heads/main from Alaksiej Ščarbaty
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=6f1722052b5 ]

NIFI-16003 Re-copy checkpoints before deleting lingering migration table 
(#11320)

When a prior legacy-checkpoint migration failed mid-flight (e.g. step 5
copy threw because DynamoDB's request router still cached the old table
schema after recreation), the migration table was left behind as the
only surviving copy of the per-shard checkpoint state. On restart,
LegacyCheckpointMigrator.cleanupLingeringMigration would unconditionally
delete that table, silently losing all checkpoints and forcing every
shard back to INITIAL_STREAM_POSITION.

Re-run the copy from the migration table into the checkpoint table
before deleting it. This is safe because any failure in the rename
sequence propagates out of ensureCheckpointTableExists and prevents the
processor from starting, so the checkpoint table cannot have advanced
past the migration table's contents — the re-copy either restores lost
state or re-puts identical rows.

> ConsumeKinesis can drop checkpoints after a failed migration
> ------------------------------------------------------------
>
>                 Key: NIFI-16003
>                 URL: https://issues.apache.org/jira/browse/NIFI-16003
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.9.0
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Alaksiej Ščarbaty
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> h1. ConsumeKinesis legacy checkpoint migration can drop the migration table 
> before the new checkpoint table is populated
> h2. Affected component
> _nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
> h2. Problem
> _LegacyCheckpointMigrator.renameMigrationTable_ performs:
> 1. _deleteTable(checkpointTableName)_ — drop the legacy table.
> 2. {_}waitForTableDeleted(checkpointTableName){_}.
> 3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, 
> same name.
> 4. {_}waitForTableActive(checkpointTableName){_}.
> 5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
> 6. {_}deleteTable(migrationTableName){_}.
> If step 5 throws after step 3, the only surviving copy of the migrated state 
> is in migration{_}. On restart, 
> KinesisShardManager.ensureCheckpointTableExists{_} sees _case NEW_ (recreated 
> table has new schema) and calls {_}cleanupLingeringMigration{_}, which 
> {*}deletes the migration table without rerunning the copy{*}. The "legacy 
> checkpoint table retains original data" comment is wrong here — the legacy 
> table was already deleted in step 1.
> Result: silent loss of all per-shard checkpoint state; every shard restarts 
> from {_}INITIAL_STREAM_POSITION{_}.
> h2. Observed trigger: DynamoDB request-router schema cache
> After the original table has been recreated with a new key schema, writing 
> records to it might result in the following error, identifying that some 
> layer in the DynamoDB request processing still sees the old table schema. 
> {{DynamoDbException: One or more parameter values were invalid:}}
> {{Missing the key leaseKey in the item (Status Code: 400)}}
> The assumption is that step 5 fails on a fraction of streams because the 
> same-name delete+recreate races against DynamoDB's request-router cache. Per 
> the [USENIX ATC 2022 
> paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
>  (6.6 Metadata availability), routers cache table key schema. The metadata 
> service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but 
> routers keep old key schema in the cache. The first _PutItem_ hits stale 
> cache and is validated against the old schema.
> _ValidationException_ is non-retryable, so step 5 terminates on the first 
> item. (Any transient step-5 error reproduces the same outcome — the cache 
> race is just the most reliable trigger.)
> Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams 
> whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within 
> ~15–30 ms failed and lost state on restart.
> h2. Proposed fix
> In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ + 
> migration-table-present as an {*}incomplete migration{*}. A clean 
> _renameMigrationTable_ run deletes the migration table at the end, so its 
> presence proves a prior attempt failed between steps 3 and 6.
> Replace _cleanupLingeringMigration_ in this branch with a recovery that:
>  - Re-runs {_}copyCheckpointItems(migrationTableName, checkpointTableName){_}.
>  - Only after that deletes the lingering migration table.
> h2. Impact
> For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can 
> mean reprocessing up to 24 h of records and downstream duplicates.
> For _LATEST_ initial checkpoint position, total checkpoint loss means a 
> potential loss of data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to