[ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang updated FLINK-29395:
---------------------------------
    Fix Version/s: 1.16.0
                       (was: 1.16.1)

> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
>                 Key: FLINK-29395
>                 URL: https://issues.apache.org/jira/browse/FLINK-29395
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
>            Reporter: Hong Liang Teoh
>            Assignee: Hong Liang Teoh
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.17.0, 1.15.3
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel 
> starting position, the first record batch is not empty, but the first 
> deaggregated record batch is empty. This can happen if the user explicitly 
> specifies the hashkey in the KPL, and does not ensure that the 
> explicitHashKey of every record in the aggregated batch is the same.
> When resharding occurs, the aggregated record batch can have records that are 
> out of the shard's hash key range. This causes the records to be dropped when 
> deaggregating, and can result in this situation, where record batch is not 
> empty, but the deaggregated record batch is empty.
> The symptom seen is similar to the issue seen in 
> https://issues.apache.org/jira/browse/FLINK-20088.
> See 
> [here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
>  and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a 
> more detailed explanation
> *Replicate*
> Get shard information
> {code:java}
> aws kinesis describe-stream --stream-name <stream_name>
> {
>     "StreamDescription": {
>         "Shards": [
>             ...
>             {
>                 "ShardId": "shardId-000000000037",
>                 "ParentShardId": "shardId-000000000027",
>                 "HashKeyRange": {
>                     "StartingHashKey": 
> "272225893536750770770699685945414569164",
>                     "EndingHashKey": "340282366920938463463374607431768211455"
>                 }
>             ...
>             },
>             {
>                 "ShardId": "shardId-000000000038",
>                 "ParentShardId": "shardId-000000000034",
>                 "AdjacentParentShardId": "shardId-000000000036",
>                 "HashKeyRange": {
>                     "StartingHashKey": 
> "204169420152563078078024764459060926873",
>                     "EndingHashKey": "272225893536750770770699685945414569163"
>                 }
>             ...
>             }
>         ]
> ...
>     }
> }{code}
> Create an aggregate record with two records, each with explicit hash keys 
> belonging to different shards
> {code:java}
> RecordAggregator aggregator = new RecordAggregator();
> String record1 = "RECORD_1";
> String record2 = "RECORD_2";
> aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
> record1.getBytes());
> aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
> record2.getBytes());
> AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
>    .build();
> kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
>  {code}
> Consume from given stream whilst specifying a Timestamp where the only record 
> retrieved is the record above.
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type: 
> AT_TIMESTAMP_SEQUENCE_NUM
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this 
> condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to