siddhanta-rath opened a new issue, #32944:
URL: https://github.com/apache/beam/issues/32944
### What happened?
We have been running our Beam jobs on Google Cloud Dataflow for a while now.
We are now evaluating migrating to running them on Flink.
All of our jobs are built using beam sdk version 2.56.0.
During this exercise, we have experienced an issue where our jobs are not
coming up on FlinkRunner with Flink(v1.16.x) when we use the same pipelines
built using 2.56.0 but when we downgrade the beam SDK version to 2.49 the jobs
start running. But, when we downgrade we lose out on some of the features
offered in the newer Beam Sdks ( RequestResponseIO among others, which is a
critical component in one of our jobs).
We tried to run the `WordCount` example pipeline with the beam version
starting from 2.49 till 2.56.0 with the respective runner versions from 2.49 to
2.56.0 on Flink 1.16.x but to no avail.
**These are the failure LOGS**
`caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Pipeline execution failed
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more
Caused by: java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107)
~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_372]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_372]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Job was
submitted in detached mode. Results of job execution, such as accumulators,
runtime, etc. are not available.
at
org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171)
~[?:?]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
~[?:?]
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_372]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_372]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more
2024-07-25 11:45:12,956 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_372]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_372]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
~[?:1.8.0_372]
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_372]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_372]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_372]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_372]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_372]
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[?:1.8.0_372]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[?:1.8.0_372]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[?:1.8.0_372]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
[?:1.8.0_372]
Caused by:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Pipeline execution failed
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more
Caused by: java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107)
~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_372]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_372]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Job was
submitted in detached mode. Results of job execution, such as accumulators,
runtime, etc. are not available.
at
org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171)
~[?:?]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
~[?:?]
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?]
at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_372]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_372]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
~[flink-dist_2.12-1.14.3.jar:1.14.3]
... 12 more`
I have gone through the beam community issues
[Issue/[29660](https://github.com/apache/beam/issues/29660)] and also the
release notes. Beyond the compatibility mismatch (which in our case is already
addressed), I didn't find anything else that could directly be the cause for
the mentioned issue...
QUESTION IS - Has anyone in the community experienced such issues and have
found a workaround to run Beam pipelines built using Beam SDKs > 2.49.0 on
Flink ? Particularly if anybody is successfully running Beam pipelines built
using SDK version 2.56 or newer...
Any help is appreciated !
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [X] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]