[ https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xingbo Huang updated FLINK-29395: --------------------------------- Fix Version/s: 1.16.0 (was: 1.16.1) > [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard > --------------------------------------------------------------------- > > Key: FLINK-29395 > URL: https://issues.apache.org/jira/browse/FLINK-29395 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2 > Reporter: Hong Liang Teoh > Assignee: Hong Liang Teoh > Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.17.0, 1.15.3 > > > *Background* > The consumer fails when an EFO record publisher uses a timestamp sentinel > starting position, the first record batch is not empty, but the first > deaggregated record batch is empty. This can happen if the user explicitly > specifies the hashkey in the KPL, and does not ensure that the > explicitHashKey of every record in the aggregated batch is the same. > When resharding occurs, the aggregated record batch can have records that are > out of the shard's hash key range. This causes the records to be dropped when > deaggregating, and can result in this situation, where record batch is not > empty, but the deaggregated record batch is empty. > The symptom seen is similar to the issue seen in > https://issues.apache.org/jira/browse/FLINK-20088. > See > [here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md] > and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a > more detailed explanation > *Replicate* > Get shard information > {code:java} > aws kinesis describe-stream --stream-name <stream_name> > { > "StreamDescription": { > "Shards": [ > ... > { > "ShardId": "shardId-000000000037", > "ParentShardId": "shardId-000000000027", > "HashKeyRange": { > "StartingHashKey": > "272225893536750770770699685945414569164", > "EndingHashKey": "340282366920938463463374607431768211455" > } > ... > }, > { > "ShardId": "shardId-000000000038", > "ParentShardId": "shardId-000000000034", > "AdjacentParentShardId": "shardId-000000000036", > "HashKeyRange": { > "StartingHashKey": > "204169420152563078078024764459060926873", > "EndingHashKey": "272225893536750770770699685945414569163" > } > ... > } > ] > ... > } > }{code} > Create an aggregate record with two records, each with explicit hash keys > belonging to different shards > {code:java} > RecordAggregator aggregator = new RecordAggregator(); > String record1 = "RECORD_1"; > String record2 = "RECORD_2"; > aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", > record1.getBytes()); > aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", > record2.getBytes()); > AmazonKinesis kinesisClient = AmazonKinesisClient.builder() > .build(); > kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest")); > {code} > Consume from given stream whilst specifying a Timestamp where the only record > retrieved is the record above. > *Error* > {code:java} > java.lang.IllegalArgumentException: Unexpected sentinel type: > AT_TIMESTAMP_SEQUENCE_NUM > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115) > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91) > at > org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) {code} > > *Solution* > This is fixed by reusing the existing timestamp starting position in this > condition. -- This message was sent by Atlassian Jira (v8.20.10#820010)