rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r648597475



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -129,44 +142,66 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
     private static class NumberSource implements SourceFunction<Integer>, 
CheckpointedFunction {
 
         private static final long serialVersionUID = 1L;
+        private static int lastCheckpointValue;
         private ListState<Integer> valueState;
-        public static int lastCheckpointedValue;
+        private boolean isRunning = true;
+
+        public NumberSource() {
+            lastCheckpointValue = 0;
+        }
 
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             Iterator<Integer> stateIt = valueState.get().iterator();
-            boolean isRecovered = stateIt.hasNext();
+            boolean isRecovered = lastCheckpointValue > 0;
 
             if (isRecovered) {
                 Integer lastValue = stateIt.next();

Review comment:
       When running locally, about 1 of 100 fails with:
   ```
   org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
        at akka.dispatch.OnComplete.internal(Future.scala:264)
        at akka.dispatch.OnComplete.internal(Future.scala:261)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, 
backoffTimeMS=0)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
        at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        ... 4 more
   Caused by: java.util.NoSuchElementException
        at java.util.ArrayList$Itr.next(ArrayList.java:864)
        at 
org.apache.flink.test.checkpointing.IgnoreInFlightDataITCase$NumberSource.run(IgnoreInFlightDataITCase.java:160)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:62)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
   ```
   
   I guess the reason is that the state is accessed (via iterator) without 
synchronization (using checkpoint lock).
   `SourceFunction.run()` is executed in a separate thread 
`LegacySourceFunctionThread`, and all state accesses, as well as data emission 
must be done in synchronized(checkpointLock).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to