[ 
https://issues.apache.org/jira/browse/FLINK-32423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32423.
------------------------------
    Resolution: Won't Fix

The SQL runner is intended to be an example and not really a proper all 
encompassing solution :) I think this limitation is acceptable

> Flink-sql-runner-example application fails if multiple execute() called in 
> one sql file
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-32423
>                 URL: https://issues.apache.org/jira/browse/FLINK-32423
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kubernetes Operator
>            Reporter: Guozhen Yang
>            Priority: Not a Priority
>
> h2. Summary:
> flink-sql-runner-example application fails if multiple execute() called in 
> one sql file
> h2. Background:
> We have a series of batch jobs running on a table partitioned by date. The 
> jobs need to be run sequencially in chronological order. Which means only 
> after the batch job #1 finishes running 2023-06-01 partition, the batch job 
> #2 running 2023-06-02 partition starts running. So we loop through dates and 
> submit multiple jobs in a single application, and the flink application is 
> deployed in application mode with HA turned off.
> According to [flink 
> document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode],
>  the Application Mode allows the submission of applications consisting of 
> multiple jobs, but High-Availability is not supported in these cases.
> h2. The problem:
> The application consisted of multiple jobs fails when the second job is 
> executed.
> Stack trace is shown as below:
> {noformat}
> 2023-06-21 03:21:44,720 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error 
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown 
> Source) ~[?:?]
>     at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337)
>  ~[flink-dist-1.1
>     6.2.jar:1.16.2]
>     at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
>  ~[flink-dist
>     -1.16.2.jar:1.16.2]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>  ~[flink-rpc-a
>     kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed
>     cb5a1.jar:1.16.2]
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>  ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2
>     f4d1edcb5a1.jar:1.16.2]
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
> [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>  [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar
>     :1.16.2]
>     at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
>     at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source) [?:?]
>     at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
>     at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
>     at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
> Caused by: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>     ... 14 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error: Failed to execute sql
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
> ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
>  ~[flink-dist-1.1
>     6.2.jar:1.16.2]
>     ... 13 more
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
>     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:?]
>     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
> ~[?:?]
>     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source) ~[?:?]
>     at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
> ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
>  ~[flink-dist-1.1
>     6.2.jar:1.16.2]
>     ... 13 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
> one execute() or executeAsync() call in a single environment.
>     at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
>  ~[?:?]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>  ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
>     at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
>     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:?]
>     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
> ~[?:?]
>     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source) ~[?:?]
>     at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
> ~[flink-dist-1.16.2.jar:1.16.2]
>     at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
>  ~[flink-dist-1.1
>     6.2.jar:1.16.2]
>     ... 13 more
> {noformat}
> h2. How to reproduce:
> 1. Start a minikube cluster
> 2. Add new script file _two-selects.sql_ to 
> [examples/flink-sql-runner-example/sql-scripts 
> folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example].
>  
> The contents of _two-selects.sql_ is shown as below.
> {noformat}
> select 1;
> select 1;
> {noformat}
> 3. Follow the 
> [instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md]
>  to build the flink-sql-runner-example image.
> 4. Use minikube image load command to load the image.
> 4. Modify [flinkdep yaml 
> file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml],
>  change sepc.job.args to args: 
> ["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep 
> yaml file.
> 5. The application fails.
> h2. Possible reason:
> According to [flink-kubernetes-oeprator 
> document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler],
>  flink by default generate deterministic jobids based on clusterId.
> {quote}Flink by default generates deterministic jobids based on the clusterId 
> (which is the CR name in our case). This causes checkpoint path conflicts if 
> the job is ever restarted from an empty state (stateless upgrade). We 
> therefore generate a random jobid to avoid this.
> {quote}
> I found flink-kubernetes-operator always set job id when submitting 
> application. [Corresponding code of setJobIdIfNecessary is 
> here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37]
> But according to [flink's 
> code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213],
>  there are two situations.
> 1. HA is not activated and job id is not set when submitting application(line 
> 213 to 217). runApplicationAsync is called with 
> enforceSingleJobExecution=false. So mult-job execution is viable.
> 2. If job id is not set when submitting application(line 218 to 233). Job id 
> is set based on cluster id. After the job is fixed, runApplicationAsync is 
> called with enforceSingleJobExecution=true. So multi-job execution is not 
> viable.
> If flink-kubernetes-operator always set job id when submitting application, 
> condition of situation #1 will never match. So application submitted with 
> flink-kubernetes-operator cannot execute multiple jobs, even if the 
> application is deployed in application mode and with HA turned off.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to