[
https://issues.apache.org/jira/browse/FLINK-32423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gyula Fora closed FLINK-32423.
------------------------------
Resolution: Won't Fix
The SQL runner is intended to be an example and not really a proper all
encompassing solution :) I think this limitation is acceptable
> Flink-sql-runner-example application fails if multiple execute() called in
> one sql file
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-32423
> URL: https://issues.apache.org/jira/browse/FLINK-32423
> Project: Flink
> Issue Type: Improvement
> Components: Kubernetes Operator
> Reporter: Guozhen Yang
> Priority: Not a Priority
>
> h2. Summary:
> flink-sql-runner-example application fails if multiple execute() called in
> one sql file
> h2. Background:
> We have a series of batch jobs running on a table partitioned by date. The
> jobs need to be run sequencially in chronological order. Which means only
> after the batch job #1 finishes running 2023-06-01 partition, the batch job
> #2 running 2023-06-02 partition starts running. So we loop through dates and
> submit multiple jobs in a single application, and the flink application is
> deployed in application mode with HA turned off.
> According to [flink
> document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode],
> the Application Mode allows the submission of applications consisting of
> multiple jobs, but High-Availability is not supported in these cases.
> h2. The problem:
> The application consisted of multiple jobs fails when the second job is
> executed.
> Stack trace is shown as below:
> {noformat}
> 2023-06-21 03:21:44,720 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337)
> ~[flink-dist-1.1
> 6.2.jar:1.16.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
> ~[flink-dist
> -1.16.2.jar:1.16.2]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-a
> kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed
> cb5a1.jar:1.16.2]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2
> f4d1edcb5a1.jar:1.16.2]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar
> :1.16.2]
> at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
> Source) [?:?]
> at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
> Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> ... 14 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: Failed to execute sql
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
> ~[flink-dist-1.1
> 6.2.jar:1.16.2]
> ... 13 more
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
> ~[flink-dist-1.1
> 6.2.jar:1.16.2]
> ... 13 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than
> one execute() or executeAsync() call in a single environment.
> at
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> ~[?:?]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
> at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
> ~[flink-dist-1.16.2.jar:1.16.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
> ~[flink-dist-1.1
> 6.2.jar:1.16.2]
> ... 13 more
> {noformat}
> h2. How to reproduce:
> 1. Start a minikube cluster
> 2. Add new script file _two-selects.sql_ to
> [examples/flink-sql-runner-example/sql-scripts
> folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example].
>
> The contents of _two-selects.sql_ is shown as below.
> {noformat}
> select 1;
> select 1;
> {noformat}
> 3. Follow the
> [instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md]
> to build the flink-sql-runner-example image.
> 4. Use minikube image load command to load the image.
> 4. Modify [flinkdep yaml
> file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml],
> change sepc.job.args to args:
> ["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep
> yaml file.
> 5. The application fails.
> h2. Possible reason:
> According to [flink-kubernetes-oeprator
> document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler],
> flink by default generate deterministic jobids based on clusterId.
> {quote}Flink by default generates deterministic jobids based on the clusterId
> (which is the CR name in our case). This causes checkpoint path conflicts if
> the job is ever restarted from an empty state (stateless upgrade). We
> therefore generate a random jobid to avoid this.
> {quote}
> I found flink-kubernetes-operator always set job id when submitting
> application. [Corresponding code of setJobIdIfNecessary is
> here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37]
> But according to [flink's
> code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213],
> there are two situations.
> 1. HA is not activated and job id is not set when submitting application(line
> 213 to 217). runApplicationAsync is called with
> enforceSingleJobExecution=false. So mult-job execution is viable.
> 2. If job id is not set when submitting application(line 218 to 233). Job id
> is set based on cluster id. After the job is fixed, runApplicationAsync is
> called with enforceSingleJobExecution=true. So multi-job execution is not
> viable.
> If flink-kubernetes-operator always set job id when submitting application,
> condition of situation #1 will never match. So application submitted with
> flink-kubernetes-operator cannot execute multiple jobs, even if the
> application is deployed in application mode and with HA turned off.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)