[ 
https://issues.apache.org/jira/browse/BEAM-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483174#comment-17483174
 ] 

Janek Bevendorff commented on BEAM-8980:
----------------------------------------

This is definitely an issue and not solved. I am running a Python Beam pipeline 
on Flink (Portable runner with a Beam JobServer) and every time I use at least 
8 concurrent TaskManagers, my job throws some totally unrelated gRPC exception 
after finishing a global combine. I can hardly reproduce this 1-4 with 
TaskManagers.

The exception causes the job to either fail or restart itself from scratch only 
to fail again eventually. I actually don't have recovery options enabled in 
Flink at the moment, so I am not quite sure what part is being retried and why, 
but it happens occasionally (but usually fails quickly thereafter).

Especially if the jobs runs for longer periods of time, it can also happen that 
the last TaskManager stays stuck in RUNNING indefinitely mode long after the 
actual job has finished.

At the end of the global combine phase, I get the following exception in the 
TaskManager history:
{code:java}
2022-01-27 14:15:09,260 WARN  
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error 
cleaning up servers urn: "beam:env:external:v1"payload: 
"\n\021\n\017localhost:50000"capabilities: "beam:coder:bytes:v1"capabilities: 
"beam:coder:string_utf8:v1"capabilities: "beam:coder:kv:v1"capabilities: 
"beam:coder:bool:v1"capabilities: "beam:coder:varint:v1"capabilities: 
"beam:coder:double:v1"capabilities: "beam:coder:iterable:v1"capabilities: 
"beam:coder:timer:v1"capabilities: "beam:coder:interval_window:v1"capabilities: 
"beam:coder:length_prefix:v1"capabilities: 
"beam:coder:global_window:v1"capabilities: 
"beam:coder:windowed_value:v1"capabilities: 
"beam:coder:param_windowed_value:v1"capabilities: 
"beam:coder:state_backed_iterable:v1"capabilities: 
"beam:coder:custom_window:v1"capabilities: "beam:coder:row:v1"capabilities: 
"beam:coder:sharded_key:v1"capabilities: 
"beam:protocol:progress_reporting:v0"capabilities: 
"beam:protocol:harness_monitoring_infos:v1"capabilities: 
"beam:protocol:worker_status:v1"capabilities: 
"beam:combinefn:packed_python:v1"capabilities: 
"beam:version:sdk_base:apache/beam_python3.8_sdk:2.35.0"capabilities: 
"beam:transform:sdf_truncate_sized_restrictions:v1"capabilities: 
"beam:transform:to_string:v1"dependencies {  type_urn: 
"beam:artifact:type:file:v1"  type_payload: 
"\n\207\001/tmp/beam-artifact-staging/5b95d60d1d41127a9221f18e9f6e07e3c6c231ab7a10db788b8928cd1afd7c44/1-ref_Environment_default_e-workflow.tar.gz"
  role_urn: "beam:artifact:role:staging_to:v1"  role_payload: 
"\n\017workflow.tar.gz"}
java.lang.IllegalStateException: call already closed    at 
org.apache.beam.vendor.grpc.v1p36p0.com.google.common.base.Preconditions.checkState(Preconditions.java:508)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:209)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:202)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:380)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.state.GrpcStateService.close(GrpcStateService.java:63)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at org.apache.beam.sdk.fn.server.GrpcFnServer.close(GrpcFnServer.java:156) 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
 
~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.close(FlinkExecutableStageFunction.java:373)
 
[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
 [flink-dist_2.12-1.13.0.jar:1.13.0]    at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519) 
[flink-dist_2.12-1.13.0.jar:1.13.0]    at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357) 
[flink-dist_2.12-1.13.0.jar:1.13.0]    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
[flink-dist_2.12-1.13.0.jar:1.13.0]    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
[flink-dist_2.12-1.13.0.jar:1.13.0]    at java.lang.Thread.run(Thread.java:748) 
[?:1.8.0_292] {code}
And this in the Flink JobManager log:
{code:java}
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: CANCELLED: 
call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to 
disable this exception    at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status.asRuntimeException(Status.java:526)
    at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)
    at 
org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:103)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:115)
    at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:718)
    at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:362)
    at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:267)
    at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:113)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)    
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)    
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)    at 
java.lang.Thread.run(Thread.java:748)    Suppressed: 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: CANCELLED: 
call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to 
disable this exception        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status.asRuntimeException(Status.java:526)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)
        at 
org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.close(BeamFnDataSizeBasedBufferingOutboundObserver.java:97)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.close(SdkHarnessClient.java:729)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:492)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:268)
        ... 6 more        Suppressed: java.lang.IllegalStateException: 
Processing bundle failed, TODO: [BEAM-3962] abort bundle.            at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:516)
            ... 9 more
{code}

> Running GroupByKeyLoadTest on Portable Flink fails
> --------------------------------------------------
>
>                 Key: BEAM-8980
>                 URL: https://issues.apache.org/jira/browse/BEAM-8980
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, testing
>            Reporter: Michał Walenia
>            Priority: P2
>
> When running a GBK Load test using Java harness image and JobServer image 
> generated from master, the load test fails with a cryptic exception:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Invalid job state: 
> FAILED.
> 11:45:31      at 
> org.apache.beam.sdk.loadtests.JobFailure.handleFailure(JobFailure.java:55)
> 11:45:31      at org.apache.beam.sdk.loadtests.LoadTest.run(LoadTest.java:106)
> 11:45:31      at 
> org.apache.beam.sdk.loadtests.CombineLoadTest.run(CombineLoadTest.java:66)
> 11:45:31      at 
> org.apache.beam.sdk.loadtests.CombineLoadTest.main(CombineLoadTest.java:169)
> {code}
>  
> After some investigation, I found a stacktrace of the error:
> {code:java}
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
> CANCELLED: call already 
> cancelledorg.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
> CANCELLED: call already cancelled at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
>  at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:339)
>  at 
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:90)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:102)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:278)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:201)
>  at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748) Suppressed: 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
> CANCELLED: call already cancelled at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
>  at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:339)
>  at 
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.close(BeamFnDataSizeBasedBufferingOutboundObserver.java:84)
>  at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:298)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:202)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:202)
>  ... 6 more Suppressed: java.lang.IllegalStateException: Processing bundle 
> failed, TODO: [BEAM-3962] abort bundle. at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:320)
>  ... 8 more
> {code}
> It seems that the core issue is an IllegalStateException thrown from 
> SdkHarnessClient.java:320, related to BEAM-3962.
>  It is important to note that the stacktrace above comes from the Flink 
> cluster, not from the Gradle job that was executed.
> The link to Jenkins job is here: 
> [https://builds.apache.org/job/beam_LoadTests_Java_GBK_Flink_Batch_PR/28/console]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to