Alaksiej Ščarbaty created NIFI-16003:
----------------------------------------

             Summary: ConsumeKinesis can drop checkpoints after a failed 
migration
                 Key: NIFI-16003
                 URL: https://issues.apache.org/jira/browse/NIFI-16003
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
    Affects Versions: 2.9.0
            Reporter: Alaksiej Ščarbaty
            Assignee: Alaksiej Ščarbaty


h1. ConsumeKinesis legacy checkpoint migration can drop the migration table 
before the new checkpoint table is populated
h2. Affected component

_nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
h2. Problem

_LegacyCheckpointMigrator.renameMigrationTable_ performs:

1. _deleteTable(checkpointTableName)_ — drop the legacy table.
2. {_}waitForTableDeleted(checkpointTableName){_}.
3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, same 
name.
4. {_}waitForTableActive(checkpointTableName){_}.
5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
6. {_}deleteTable(migrationTableName){_}.

If step 5 throws after step 3, the only surviving copy of the migrated state is 
in {_}_migration{_}. On restart, 
_KinesisShardManager.ensureCheckpointTableExists_ sees _case NEW_ (recreated 
table has new schema) and calls {_}cleanupLingeringMigration{_}, which 
{*}deletes the migration table without rerunning the copy{*}. The "legacy 
checkpoint table retains original data" comment is wrong here — the legacy 
table was already deleted in step 1.

Result: silent loss of all per-shard checkpoint state; every shard restarts 
from {_}INITIAL_STREAM_POSITION{_}.
h2. Observed trigger: DynamoDB request-router schema cache

After the original table has been recreated with a new key schema, writing 
records to it might result in the following error, identifying that some layer 
in the DynamoDB request processing still sees the old table schema. 

{{DynamoDbException: One or more parameter values were invalid:}}
{{Missing the key leaseKey in the item (Status Code: 400)}}

The assumption is that step 5 fails on a fraction of streams because the 
same-name delete+recreate races against DynamoDB's request-router cache. Per 
the [USENIX ATC 2022 
paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
 (6.6 Metadata availability), routers cache table key schema. The metadata 
service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but 
routers keep old key schema in the cache. The first _PutItem_ hits stale cache 
and is validated against the old schema.

_ValidationException_ is non-retryable, so step 5 terminates on the first item. 
(Any transient step-5 error reproduces the same outcome — the cache race is 
just the most reliable trigger.)

Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams 
whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within 
~15–30 ms failed and lost state on restart.
h2. Proposed fix

In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ + 
migration-table-present as an {*}incomplete migration{*}. A clean 
_renameMigrationTable_ run deletes the migration table at the end, so its 
presence proves a prior attempt failed between steps 3 and 6.

Replace _cleanupLingeringMigration_ in this branch with a recovery that:
 - Re-runs {_}copyCheckpointItems(migrationTableName, 
checkpointTableName){_}`(idempotent: items keyed on 
{_}streamName{_}+{_}shardId{_}).
 - Only after that deletes the lingering migration table.

h2. Impact

For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can mean 
reprocessing up to 24 h of records and downstream duplicates.

For _LATEST_ initial checkpoint position, total checkpoint loss means a 
potential loss of data.
 #  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to