y1chi commented on pull request #11916: URL: https://github.com/apache/beam/pull/11916#issuecomment-647696671
> It is true that the fn protocol supports one cache token per handler (e.g. user state or side input handler). Those handler do not change for the lifetime of the application. I'm still trying to understand what the problem is. Cache tokens have been working fine so far. Could you provide some logs or test cases which show that there is a problem? Last time I checked with @lukecwik he mentioned the SDK is expecting one global cache token per-bundle for all user states, and one cache token per side-input. The problem is that it seems we can't declare more than one user state in Stateful Dofn, otherwise the SDK fails. Such as the test_pardo_state_only_test I'm trying to update in this PR. ``` 09:13:30 [flink-runner-job-invoker] ERROR org.apache.beam.runners.jobsubmission.JobInvocation - Error during job invocation test_pardo_state_only_1592842407.43_aafef3e7-73a3-4a63-be4c-4be6c6fc7b7d. 09:13:30 java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 09:13:30 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 09:13:30 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 09:13:30 at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864) 09:13:30 at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194) 09:13:30 at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116) 09:13:30 at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83) 09:13:30 at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83) 09:13:30 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) 09:13:30 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) 09:13:30 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) 09:13:30 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 09:13:30 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 09:13:30 at java.lang.Thread.run(Thread.java:748) 09:13:30 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 09:13:30 at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) 09:13:30 at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175) 09:13:30 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 09:13:30 at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) 09:13:30 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 09:13:30 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 09:13:30 at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) 09:13:30 at akka.dispatch.OnComplete.internal(Future.scala:264) 09:13:30 at akka.dispatch.OnComplete.internal(Future.scala:261) 09:13:30 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 09:13:30 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 09:13:30 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 09:13:30 at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) 09:13:30 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 09:13:30 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 09:13:30 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) 09:13:30 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) 09:13:30 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) 09:13:30 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 09:13:30 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 09:13:30 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 09:13:30 at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 09:13:30 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 09:13:30 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 09:13:30 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 09:13:30 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 09:13:30 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 09:13:30 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 09:13:30 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) 09:13:30 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 09:13:30 at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 09:13:30 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 09:13:30 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 09:13:30 Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy 09:13:30 at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) 09:13:30 at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) 09:13:30 at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) 09:13:30 at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) 09:13:30 at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) 09:13:30 at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) 09:13:30 at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) 09:13:30 at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) 09:13:30 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 09:13:30 at java.lang.reflect.Method.invoke(Method.java:498) 09:13:30 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) 09:13:30 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) 09:13:30 at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) 09:13:30 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) 09:13:30 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 09:13:30 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 09:13:30 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 09:13:30 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 09:13:30 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 09:13:30 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 09:13:30 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 09:13:30 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 09:13:30 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 09:13:30 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 09:13:30 at akka.actor.ActorCell.invoke(ActorCell.scala:561) 09:13:30 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 09:13:30 at akka.dispatch.Mailbox.run(Mailbox.scala:225) 09:13:30 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 09:13:30 ... 4 more 09:13:30 Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last): 09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 247, in _execute 09:13:30 response = task() 09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 304, in <lambda> 09:13:30 lambda: self.create_worker().do_instruction(request), request) 09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 473, in do_instruction 09:13:30 getattr(request, request_type), request.instruction_id) 09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 505, in process_bundle 09:13:30 instruction_id, request.cache_tokens): 09:13:30 File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__ 09:13:30 return self.gen.next() 09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 894, in process_instruction_id 09:13:30 assert not user_state_cache_token 09:13:30 AssertionError ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
