y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647696671


   > It is true that the fn protocol supports one cache token per handler (e.g. 
user state or side input handler). Those handler do not change for the lifetime 
of the application. I'm still trying to understand what the problem is. Cache 
tokens have been working fine so far. Could you provide some logs or test cases 
which show that there is a problem?
   
   Last time I checked with @lukecwik he mentioned the SDK is expecting one 
global cache token per-bundle for all user states, and one cache token per 
side-input.
   
   The problem is that it seems we can't declare more than one user state in 
Stateful Dofn, otherwise the SDK fails. Such as the test_pardo_state_only_test 
I'm trying to update in this PR.
   
   ```
   09:13:30 [flink-runner-job-invoker] ERROR 
org.apache.beam.runners.jobsubmission.JobInvocation - Error during job 
invocation 
test_pardo_state_only_1592842407.43_aafef3e7-73a3-4a63-be4c-4be6c6fc7b7d.
   09:13:30 java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   09:13:30     at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   09:13:30     at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   09:13:30     at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864)
   09:13:30     at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194)
   09:13:30     at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116)
   09:13:30     at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83)
   09:13:30     at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83)
   09:13:30     at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
   09:13:30     at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
   09:13:30     at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
   09:13:30     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   09:13:30     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   09:13:30     at java.lang.Thread.run(Thread.java:748)
   09:13:30 Caused by: org.apache.flink.runtime.client.JobExecutionException: 
Job execution failed.
   09:13:30     at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
   09:13:30     at 
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
   09:13:30     at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
   09:13:30     at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
   09:13:30     at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   09:13:30     at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
   09:13:30     at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
   09:13:30     at akka.dispatch.OnComplete.internal(Future.scala:264)
   09:13:30     at akka.dispatch.OnComplete.internal(Future.scala:261)
   09:13:30     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
   09:13:30     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
   09:13:30     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
   09:13:30     at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
   09:13:30     at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
   09:13:30     at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
   09:13:30     at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
   09:13:30     at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
   09:13:30     at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
   09:13:30     at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
   09:13:30     at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
   09:13:30     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
   09:13:30     at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
   09:13:30     at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
   09:13:30     at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
   09:13:30     at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
   09:13:30     at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
   09:13:30     at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
   09:13:30     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   09:13:30     at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   09:13:30     at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   09:13:30     at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   09:13:30     at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   09:13:30     at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   09:13:30 Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy
   09:13:30     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
   09:13:30     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
   09:13:30     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
   09:13:30     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
   09:13:30     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
   09:13:30     at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
   09:13:30     at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
   09:13:30     at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
   09:13:30     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   09:13:30     at java.lang.reflect.Method.invoke(Method.java:498)
   09:13:30     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
   09:13:30     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
   09:13:30     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   09:13:30     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   09:13:30     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   09:13:30     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   09:13:30     at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   09:13:30     at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   09:13:30     at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   09:13:30     at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   09:13:30     at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   09:13:30     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   09:13:30     at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   09:13:30     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   09:13:30     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   09:13:30     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   09:13:30     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   09:13:30     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   09:13:30     ... 4 more
   09:13:30 Caused by: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Error received from SDK harness for instruction 2: 
Traceback (most recent call last):
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 247, in 
_execute
   09:13:30     response = task()
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 304, in 
<lambda>
   09:13:30     lambda: self.create_worker().do_instruction(request), request)
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 473, in 
do_instruction
   09:13:30     getattr(request, request_type), request.instruction_id)
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 505, in 
process_bundle
   09:13:30     instruction_id, request.cache_tokens):
   09:13:30   File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
   09:13:30     return self.gen.next()
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 894, in 
process_instruction_id
   09:13:30     assert not user_state_cache_token
   09:13:30 AssertionError
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to