Danny Cranmer created FLINK-20088:
-------------------------------------
Summary: [Kinesis][Polling] Issue using Polling consumer at
timestamp with empty shard
Key: FLINK-20088
URL: https://issues.apache.org/jira/browse/FLINK-20088
Project: Flink
Issue Type: Improvement
Components: Connectors / Kinesis
Reporter: Danny Cranmer
Fix For: 1.12.0
*Background*
The consumer fails when a Polling record publisher uses a timestamp sentinel
starting position and the first record batch is empty. This is because the
consumer tries to recalculate the start position from the timestamp sentinel,
this operation is not supported.
*Reproduction Steps*
Setup an application consuming from Kinesis with following properties and
consume from an empty shard:
{code:java}
String format = "yyyy-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP"); {code}
*Error*
{code:java}
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264) at
akka.dispatch.OnComplete.internal(Future.scala:261) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at
akka.actor.Actor$class.aroundReceive(Actor.scala) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 moreCaused by:
java.lang.IllegalArgumentException: Unexpected sentinel type:
AT_TIMESTAMP_SEQUENCE_NUM at
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:107)
at
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSequenceNumber(StartingPosition.java:90)
at
software.amazon.kinesis.connectors.flink.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:73)
at
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:113)
at
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:98)
at
software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at
java.util.concurrent.FutureTask.run(FutureTask.java) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) {code}
*Solution*
This is fixed by reusing the existing timestamp starting position in this
condition.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)