[
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hong Liang Teoh updated FLINK-29395:
------------------------------------
Fix Version/s: 1.16.0
(was: 1.12.0)
Affects Version/s: 1.15.2
1.14.5
1.13.6
1.12.7
Description:
*Background*
The consumer fails when an EFO record publisher uses a timestamp sentinel
starting position and the first record batch is empty. This is because the
consumer tries to recalculate the start position from the timestamp sentinel,
this operation is not supported.
This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088
*Reproduction Steps*
Setup an application consuming from Kinesis with following properties and
consume from an empty shard:
{code:java}
String format = "yyyy-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP");
consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
"EFO"); {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type:
AT_TIMESTAMP_SEQUENCE_NUM
at
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
*Solution*
This is fixed by reusing the existing timestamp starting position in this
condition.
was:
*Background*
The consumer fails when a Polling record publisher uses a timestamp sentinel
starting position and the first record batch is empty. This is because the
consumer tries to recalculate the start position from the timestamp sentinel,
this operation is not supported.
*Reproduction Steps*
Setup an application consuming from Kinesis with following properties and
consume from an empty shard:
{code:java}
String format = "yyyy-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP"); {code}
*Error*
{code:java}
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
at akka.actor.Actor$class.aroundReceive(Actor.scala)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.IllegalArgumentException: Unexpected sentinel type:
AT_TIMESTAMP_SEQUENCE_NUM
at
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:107)
at
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSequenceNumber(StartingPosition.java:90)
at
software.amazon.kinesis.connectors.flink.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:73)
at
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:113)
at
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:98)
at
software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:108)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
*Solution*
This is fixed by reusing the existing timestamp starting position in this
condition.
> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
> Key: FLINK-29395
> URL: https://issues.apache.org/jira/browse/FLINK-29395
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
> Reporter: Hong Liang Teoh
> Assignee: Danny Cranmer
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel
> starting position and the first record batch is empty. This is because the
> consumer tries to recalculate the start position from the timestamp sentinel,
> this operation is not supported.
> This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088
> *Reproduction Steps*
> Setup an application consuming from Kinesis with following properties and
> consume from an empty shard:
> {code:java}
> String format = "yyyy-MM-dd'T'HH:mm:ss";
> String date = new SimpleDateFormat(format).format(new Date());
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
> date);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
> format);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "AT_TIMESTAMP");
> consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
> "EFO"); {code}
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type:
> AT_TIMESTAMP_SEQUENCE_NUM
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
> at
> org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
>
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this
> condition.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)