[
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-29395:
----------------------------------
Fix Version/s: 1.15.3
> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
> Key: FLINK-29395
> URL: https://issues.apache.org/jira/browse/FLINK-29395
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
> Reporter: Hong Liang Teoh
> Assignee: Hong Liang Teoh
> Priority: Major
> Fix For: 1.16.0, 1.15.3
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel
> starting position and the first record batch is empty. This is because the
> consumer tries to recalculate the start position from the timestamp sentinel,
> this operation is not supported.
> This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088
> *Reproduction Steps*
> Setup an application consuming from Kinesis with following properties and
> consume from an empty shard:
> {code:java}
> String format = "yyyy-MM-dd'T'HH:mm:ss";
> String date = new SimpleDateFormat(format).format(new Date());
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
> date);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
> format);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "AT_TIMESTAMP");
> consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
> "EFO"); {code}
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type:
> AT_TIMESTAMP_SEQUENCE_NUM
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
>
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this
> condition.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)