[
https://issues.apache.org/jira/browse/BEAM-10189?focusedWorklogId=449440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-449440
]
ASF GitHub Bot logged work on BEAM-10189:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jun/20 18:24
Start Date: 22/Jun/20 18:24
Worklog Time Spent: 10m
Work Description: y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647696671
> It is true that the fn protocol supports one cache token per handler (e.g.
user state or side input handler). Those handler do not change for the lifetime
of the application. I'm still trying to understand what the problem is. Cache
tokens have been working fine so far. Could you provide some logs or test cases
which show that there is a problem?
Last time I checked with @lukecwik he mentioned the SDK is expecting one
global cache token per-bundle for all user states, and one cache token per
side-input.
The problem is that it seems we can't declare more than one user state in
Stateful Dofn, otherwise the SDK fails. Such as the test_pardo_state_only_test
I'm trying to update in this PR.
```
09:13:30 [flink-runner-job-invoker] ERROR
org.apache.beam.runners.jobsubmission.JobInvocation - Error during job
invocation
test_pardo_state_only_1592842407.43_aafef3e7-73a3-4a63-be4c-4be6c6fc7b7d.
09:13:30 java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
09:13:30 at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
09:13:30 at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
09:13:30 at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864)
09:13:30 at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194)
09:13:30 at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116)
09:13:30 at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83)
09:13:30 at
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83)
09:13:30 at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
09:13:30 at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
09:13:30 at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
09:13:30 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
09:13:30 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
09:13:30 at java.lang.Thread.run(Thread.java:748)
09:13:30 Caused by: org.apache.flink.runtime.client.JobExecutionException:
Job execution failed.
09:13:30 at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
09:13:30 at
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
09:13:30 at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
09:13:30 at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
09:13:30 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
09:13:30 at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
09:13:30 at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
09:13:30 at akka.dispatch.OnComplete.internal(Future.scala:264)
09:13:30 at akka.dispatch.OnComplete.internal(Future.scala:261)
09:13:30 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
09:13:30 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
09:13:30 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
09:13:30 at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
09:13:30 at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
09:13:30 at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
09:13:30 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
09:13:30 at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
09:13:30 at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
09:13:30 at
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
09:13:30 at
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
09:13:30 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
09:13:30 at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
09:13:30 at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
09:13:30 at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
09:13:30 at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
09:13:30 at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
09:13:30 at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
09:13:30 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
09:13:30 at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
09:13:30 at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
09:13:30 at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
09:13:30 at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
09:13:30 at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
09:13:30 Caused by: org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy
09:13:30 at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
09:13:30 at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
09:13:30 at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
09:13:30 at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
09:13:30 at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
09:13:30 at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
09:13:30 at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
09:13:30 at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
09:13:30 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
09:13:30 at java.lang.reflect.Method.invoke(Method.java:498)
09:13:30 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
09:13:30 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
09:13:30 at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
09:13:30 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
09:13:30 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
09:13:30 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
09:13:30 at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
09:13:30 at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
09:13:30 at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
09:13:30 at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
09:13:30 at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
09:13:30 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
09:13:30 at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
09:13:30 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
09:13:30 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
09:13:30 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
09:13:30 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
09:13:30 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
09:13:30 ... 4 more
09:13:30 Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction 2:
Traceback (most recent call last):
09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 247, in
_execute
09:13:30 response = task()
09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 304, in
<lambda>
09:13:30 lambda: self.create_worker().do_instruction(request), request)
09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 473, in
do_instruction
09:13:30 getattr(request, request_type), request.instruction_id)
09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 505, in
process_bundle
09:13:30 instruction_id, request.cache_tokens):
09:13:30 File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
09:13:30 return self.gen.next()
09:13:30 File "apache_beam/runners/worker/sdk_worker.py", line 894, in
process_instruction_id
09:13:30 assert not user_state_cache_token
09:13:30 AssertionError
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 449440)
Time Spent: 3h 40m (was: 3.5h)
> Add ValueState to python sdk
> ----------------------------
>
> Key: BEAM-10189
> URL: https://issues.apache.org/jira/browse/BEAM-10189
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Yichi Zhang
> Assignee: Yichi Zhang
> Priority: P2
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)