Jiayi Zhao created BEAM-6915:
--------------------------------

             Summary: Issue when run pipeline on a separate Flink cluster
                 Key: BEAM-6915
                 URL: https://issues.apache.org/jira/browse/BEAM-6915
             Project: Beam
          Issue Type: New Feature
          Components: runner-flink
            Reporter: Jiayi Zhao


First I tried a simple pipeline on the JobService endpoint created by:

  ./gradlew :beam-runners-flink_2.11-job-server:runShadow

it works, then I tried the following examples:

  _To run on a separate [Flink 
cluster|https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/setup_quickstart.html]:_

  _1. Start a Flink cluster which exposes the Rest interface on 
{{localhost:8081}} by default._

  _2. Start JobService with Flink Rest endpoint: {{./gradlew 
:beam-runners-flink_2.11-job-server:runShadow 
-PflinkMasterUrl=localhost:8081}}._

  _3. Submit the pipeline as above._

when I run the pipeline in another console, the jobService console shows 
following errors, any ideas?

 

_$ ./gradlew :beam-runners-flink_2.11-job-server:runShadow 
-PflinkMasterUrl=localhost:8081_
_Configuration on demand is an incubating feature._

_> Task :beam-runners-flink_2.11-job-server:runShadow_
_Listening for transport dt_socket at address: 5005_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
- ArtifactStagingService started on localhost:8098_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
- Java ExpansionService started on localhost:8097_
_[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
- JobService started on localhost:8099_
_[grpc-default-executor-0] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
Encountered Unexpected Exception for Invocation 
job_e3ca1015-d683-47df-beb5-104ccbb5a457_
_org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)_
 _at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)_
 _at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)_
 _at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)_
 _at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)_
 _at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
 _at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
 _at java.lang.Thread.run(Thread.java:748)_
_[grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - 
Invoking job 
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4_
_[grpc-default-executor-0] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job 
invocation 
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4_
_[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
Flink program._
_[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch 
Execution Environment._
_[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master 
URL localhost:8081._
_[flink-runner-job-invoker] WARN 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default 
parallelism could be found. Defaulting to parallelism 1. Please set an explicit 
parallelism with --parallelism_
_[flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment 
- The job has 0 registered types and 0 default Kryo serializers_
_[flink-runner-job-invoker] WARN org.apache.flink.configuration.Configuration - 
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of 
proper key 'rest.address'_
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - 
Rest client endpoint started._
_[flink-runner-job-invoker] INFO 
org.apache.flink.client.program.rest.RestClusterClient - Submitting job 
82e6ba00ed482f4d444a13fa5814d1e2 (detached: false)._
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - 
Shutting down rest endpoint._
_[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - 
Rest endpoint shutdown complete._
_[flink-runner-job-invoker] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during 
job invocation 
BeamApp-jyzhao-0326181339-95e448eb_3b2a57a8-c8bc-463e-8be6-47890deb48b4._
_org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result._
 _at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)_
 _at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)_
 _at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)_
 _at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)_
 _at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216)_
 _at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193)_
 _at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)_
 _at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:200)_
 _at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:88)_
 _at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:64)_
 _at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:73)_
 _at 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)_
 _at 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)_
 _at 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)_
 _at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
 _at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
 _at java.lang.Thread.run(Thread.java:748)_
_Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph._
 _at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)_
 _at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)_
 _at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)_
 _at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
 _at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)_
 _at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)_
 _at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)_
 _at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)_
 _at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
 _at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)_
 _at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)_
 _at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)_
 _... 3 more_
_Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., <Exception on server side:_
_java.lang.NullPointerException_
 _at 
org.apache.flink.runtime.jobgraph.JobGraph.writeUserArtifactEntriesToConfiguration(JobGraph.java:586)_
 _at 
org.apache.flink.runtime.client.ClientUtils.setUserArtifactBlobKeys(ClientUtils.java:140)_
 _at 
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserArtifacts(ClientUtils.java:121)_
 _at 
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:78)_
 _at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:168)_
 _at 
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)_
 _at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)_
 _at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)_
 _at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)_
 _at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)_
 _at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)_
 _at java.lang.Thread.run(Thread.java:748)_

_End of exception on server side>]_
 _at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:349)_
 _at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:333)_
 _at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)_
 _at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)_
 _... 4 more_
_<============-> 98% EXECUTING [28s]_
_> :beam-runners-flink_2.11-job-server:runShadow_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to