Jiayi Zhao created BEAM-6915:
--------------------------------
Summary: Issue when run pipeline on a separate Flink cluster
Key: BEAM-6915
URL: https://issues.apache.org/jira/browse/BEAM-6915
Project: Beam
Issue Type: New Feature
Components: runner-flink
Reporter: Jiayi Zhao
First I tried a simple pipeline on the JobService endpoint created by:
./gradlew :beam-runners-flink_2.11-job-server:runShadow
it works, then I tried the following examples:
_To run on a separate [Flink
cluster|https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/setup_quickstart.html]:_
_1. Start a Flink cluster which exposes the Rest interface on
{{localhost:8081}} by default._
_2. Start JobService with Flink Rest endpoint: {{./gradlew
:beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081}}._
_3. Submit the pipeline as above._
when I run the pipeline in another console, the jobService console shows
following errors, any ideas?
_$ ./gradlew :beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081_
_Configuration on demand is an incubating feature._
_> Task :beam-runners-flink_2.11-job-server:runShadow_
_Listening for transport dt_socket at address: 5005_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver
- ArtifactStagingService started on localhost:8098_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver
- Java ExpansionService started on localhost:8097_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver
- JobService started on localhost:8099_
_[grpc-default-executor-0] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
Encountered Unexpected Exception for Invocation
job_e3ca1015-d683-47df-beb5-104ccbb5a457_
_org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)_
_at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)_
_at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)_
_at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)_
_at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)_
_at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
_at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
_at java.lang.Thread.run(Thread.java:748)_
_[grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker -
Invoking job
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4_
_[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job
invocation
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4_
_[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program._
_[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch
Execution Environment._
_[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master
URL localhost:8081._
_[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default
parallelism could be found. Defaulting to parallelism 1. Please set an explicit
parallelism with --parallelism_
_[flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers_
_[flink-runner-job-invoker] WARN org.apache.flink.configuration.Configuration -
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of
proper key 'rest.address'_
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Rest client endpoint started._
_[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
82e6ba00ed482f4d444a13fa5814d1e2 (detached: false)._
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Shutting down rest endpoint._
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Rest endpoint shutdown complete._
_[flink-runner-job-invoker] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during
job invocation
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4._
_org.apache.flink.client.program.ProgramInvocationException: Could not retrieve
the execution result._
_at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)_
_at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)_
_at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)_
_at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)_
_at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216)_
_at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193)_
_at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)_
_at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:200)_
_at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:88)_
_at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:64)_
_at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:73)_
_at
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)_
_at
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)_
_at
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)_
_at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
_at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
_at java.lang.Thread.run(Thread.java:748)_
_Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph._
_at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)_
_at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)_
_at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)_
_at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
_at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)_
_at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)_
_at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)_
_at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)_
_at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
_at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)_
_at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)_
_at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)_
_... 3 more_
_Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
server error., <Exception on server side:_
_java.lang.NullPointerException_
_at
org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)_
_at
org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)_
_at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)_
_at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)_
_at
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)_
_at
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)_
_at
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)_
_at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
_at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)_
_at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
_at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
_at java.lang.Thread.run(Thread.java:748)_
_End of exception on server side>]_
_at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)_
_at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)_
_at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)_
_at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)_
_... 4 more_
_<============-> 98% EXECUTING [28s]_
_> :beam-runners-flink_2.11-job-server:runShadow_
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)