siddhanta-rath opened a new issue, #32944:
URL: https://github.com/apache/beam/issues/32944

   ### What happened?
   
   We have been running our Beam jobs on Google Cloud Dataflow for a while now. 
We are now evaluating migrating to running them on Flink.
   All of our jobs are built using beam sdk version 2.56.0.
   
   During this exercise, we have experienced an issue where our jobs are not 
coming up on FlinkRunner with Flink(v1.16.x) when we use the same pipelines 
built using 2.56.0 but when we downgrade the beam SDK version to 2.49 the jobs 
start running. But, when we downgrade we lose out on some of the features 
offered in the newer Beam Sdks ( RequestResponseIO among others, which is a 
critical component in one of our jobs).
   
   We tried to run the `WordCount` example pipeline with the beam version 
starting from 2.49 till 2.56.0 with the respective runner versions from 2.49 to 
2.56.0 on Flink 1.16.x but to no avail.
   
   **These are the failure LOGS** 
   
   `caused by: org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error: Pipeline execution failed
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more
   Caused by: java.lang.RuntimeException: Pipeline execution failed
       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) 
~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
       at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_372]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_372]
       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more
   Caused by: org.apache.flink.api.common.InvalidProgramException: Job was 
submitted in detached mode. Results of job execution, such as accumulators, 
runtime, etc. are not available.
       at 
org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171)
 ~[?:?]
       at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
 ~[?:?]
       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) 
~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
       at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_372]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_372]
       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more
   2024-07-25 11:45:12,956 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.
   java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
       at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_372]
       at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_372]
       at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) 
~[?:1.8.0_372]
       at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
 ~[?:1.8.0_372]
       at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_372]
       at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 ~[?:1.8.0_372]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_372]
       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_372]
       at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
 ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
       at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
       at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
 ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
       at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
 [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_372]
       at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_372]
       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_372]
       at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_372]
   Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
       ... 13 more
   Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error: Pipeline execution failed
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more
   Caused by: java.lang.RuntimeException: Pipeline execution failed
       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) 
~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
       at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_372]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_372]
       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more
   Caused by: org.apache.flink.api.common.InvalidProgramException: Job was 
submitted in detached mode. Results of job execution, such as accumulators, 
runtime, etc. are not available.
       at 
org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171)
 ~[?:?]
       at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
 ~[?:?]
       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) 
~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
       at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_372]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_372]
       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
       at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
       at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
 ~[flink-dist_2.12-1.14.3.jar:1.14.3]
       ... 12 more`
   
   I have gone through the beam community issues 
[Issue/[29660](https://github.com/apache/beam/issues/29660)]  and also the 
release notes. Beyond the compatibility mismatch (which in our case is already 
addressed), I didn't find anything else that could directly be the cause for 
the mentioned issue...
   
   QUESTION IS -  Has anyone in the community experienced such issues and have 
found a workaround to run Beam pipelines built using Beam SDKs > 2.49.0 on 
Flink ? Particularly if anybody is successfully running Beam pipelines built 
using SDK version 2.56 or newer...
   
   Any help is appreciated !
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [X] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to