[ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:
------------------------------------
    Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

*Replicate*

Get shard information
{code:java}
aws kinesis describe-stream --stream-name <stream_name>
{
    "StreamDescription": {
        "Shards": [
            ...
            {
                "ShardId": "shardId-000000000037",
                "ParentShardId": "shardId-000000000027",
                "HashKeyRange": {
                    "StartingHashKey": 
"272225893536750770770699685945414569164",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                }
            ...
            },
            {
                "ShardId": "shardId-000000000038",
                "ParentShardId": "shardId-000000000034",
                "AdjacentParentShardId": "shardId-000000000036",
                "HashKeyRange": {
                    "StartingHashKey": 
"204169420152563078078024764459060926873",
                    "EndingHashKey": "272225893536750770770699685945414569163"
                }
            ...
            }
        ]
...
    }
}{code}
Create an aggregate record with two records, each with explicit hash keys 
belonging to different shards
{code:java}
RecordAggregator aggregator = new RecordAggregator();
String record1 = "RECORD_1";
String record2 = "RECORD_2";
aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
record1.getBytes());
aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
record2.getBytes());

AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
   .build();
kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
 {code}
Consume from given stream whilst specifying a Timestamp where the only record 
retrieved is an aggregate that belongs to multiple shards.

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

*Replicate*

Get shard information
{code:java}
aws kinesis describe-stream --stream-name <stream_name>
{
    "StreamDescription": {
        "Shards": [
            ...
            {
                "ShardId": "shardId-000000000037",
                "ParentShardId": "shardId-000000000027",
                "HashKeyRange": {
                    "StartingHashKey": 
"272225893536750770770699685945414569164",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                }
            ...
            },
            {
                "ShardId": "shardId-000000000038",
                "ParentShardId": "shardId-000000000034",
                "AdjacentParentShardId": "shardId-000000000036",
                "HashKeyRange": {
                    "StartingHashKey": 
"204169420152563078078024764459060926873",
                    "EndingHashKey": "272225893536750770770699685945414569163"
                }
            ...
            }
        ]
...
    }
}{code}
Create an aggregate record with two records, each with explicit hash keys 
belonging to different shards
{code:java}
RecordAggregator aggregator = new RecordAggregator();
String record1 = "RECORD_1";
String record2 = "RECORD_2";
aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
record1.getBytes());
aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
record2.getBytes());

AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
   .build();
kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
 {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
        at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.


> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
>                 Key: FLINK-29395
>                 URL: https://issues.apache.org/jira/browse/FLINK-29395
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
>            Reporter: Hong Liang Teoh
>            Assignee: Hong Liang Teoh
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel 
> starting position, the first record batch is not empty, but the first 
> deaggregated record batch is empty. This can happen if the user explicitly 
> specifies the hashkey in the KPL, and does not ensure that the 
> explicitHashKey of every record in the aggregated batch is the same.
> When resharding occurs, the aggregated record batch can have records that are 
> out of the shard's hash key range. This causes the records to be dropped when 
> deaggregating, and can result in this situation, where record batch is not 
> empty, but the deaggregated record batch is empty.
> The symptom seen is similar to the issue seen in 
> https://issues.apache.org/jira/browse/FLINK-20088.
> See 
> [here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
>  and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a 
> more detailed explanation
> *Replicate*
> Get shard information
> {code:java}
> aws kinesis describe-stream --stream-name <stream_name>
> {
>     "StreamDescription": {
>         "Shards": [
>             ...
>             {
>                 "ShardId": "shardId-000000000037",
>                 "ParentShardId": "shardId-000000000027",
>                 "HashKeyRange": {
>                     "StartingHashKey": 
> "272225893536750770770699685945414569164",
>                     "EndingHashKey": "340282366920938463463374607431768211455"
>                 }
>             ...
>             },
>             {
>                 "ShardId": "shardId-000000000038",
>                 "ParentShardId": "shardId-000000000034",
>                 "AdjacentParentShardId": "shardId-000000000036",
>                 "HashKeyRange": {
>                     "StartingHashKey": 
> "204169420152563078078024764459060926873",
>                     "EndingHashKey": "272225893536750770770699685945414569163"
>                 }
>             ...
>             }
>         ]
> ...
>     }
> }{code}
> Create an aggregate record with two records, each with explicit hash keys 
> belonging to different shards
> {code:java}
> RecordAggregator aggregator = new RecordAggregator();
> String record1 = "RECORD_1";
> String record2 = "RECORD_2";
> aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
> record1.getBytes());
> aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
> record2.getBytes());
> AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
>    .build();
> kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
>  {code}
> Consume from given stream whilst specifying a Timestamp where the only record 
> retrieved is an aggregate that belongs to multiple shards.
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type: 
> AT_TIMESTAMP_SEQUENCE_NUM
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
>       at 
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this 
> condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to