Guozhen Yang created FLINK-32423:
------------------------------------
Summary: Flink-sql-runner-example application fails if multiple
execute() called in one sql file
Key: FLINK-32423
URL: https://issues.apache.org/jira/browse/FLINK-32423
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Reporter: Guozhen Yang
h2. Summary:
flink-sql-runner-example application fails if multiple execute() called in one
sql file
h2. Background:
We have a series of batch jobs running on a table partitioned by date. The jobs
need to be run sequencially in chronological order. Which means only after the
batch job #1 finishes running 2023-06-01 partition, the batch job #2 running
2023-06-02 partition starts running. So we loop through dates and submit
multiple jobs in a single application, and the flink application is deployed in
application mode with HA turned off.
According to [flink
document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode],
the Application Mode allows the submission of applications consisting of
multiple jobs, but High-Availability is not supported in these cases.
h2. The problem:
The application consisted of multiple jobs fails when the second job is
executed.
Stack trace is shown as below:
{noformat}
2023-06-21 03:21:44,720 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred
in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source) ~[?:?]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337)
~[flink-dist-1.1
6.2.jar:1.16.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
~[flink-dist
-1.16.2.jar:1.16.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
~[flink-rpc-a
kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed
cb5a1.jar:1.16.2]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2
f4d1edcb5a1.jar:1.16.2]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar
:1.16.2]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
[?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Failed to execute sql
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
~[flink-dist-1.1
6.2.jar:1.16.2]
... 13 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
~[flink-dist-1.1
6.2.jar:1.16.2]
... 13 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than
one execute() or executeAsync() call in a single environment.
at
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
~[?:?]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
~[flink-dist-1.16.2.jar:1.16.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
~[flink-dist-1.1
6.2.jar:1.16.2]
... 13 more
{noformat}
h2. How to reproduce:
1. Start a minikube cluster
2. Add new script file _two-selects.sql_ to
[examples/flink-sql-runner-example/sql-scripts
folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example].
The contents of _two-selects.sql_ is shown as below.
{noformat}
select 1;
select 1;
{noformat}
3. Follow the
[instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md]
to build the flink-sql-runner-example image.
4. Use minikube image load command to load the image.
4. Modify [flinkdep yaml
file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml],
change sepc.job.args to args:
["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep yaml
file.
5. The application fails.
h2. Possible reason:
According to [flink-kubernetes-oeprator
document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler],
flink by default generate deterministic jobids based on clusterId.
{quote}Flink by default generates deterministic jobids based on the clusterId
(which is the CR name in our case). This causes checkpoint path conflicts if
the job is ever restarted from an empty state (stateless upgrade). We therefore
generate a random jobid to avoid this.
{quote}
I found flink-kubernetes-operator always set job id when submitting
application. [Corresponding code of setJobIdIfNecessary is
here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37]
But according to [flink's
code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213],
there are two situations.
1. HA is not activated and job id is not set when submitting application(line
213 to 217). runApplicationAsync is called with
enforceSingleJobExecution=false. So mult-job execution is viable.
2. If job id is not set when submitting application(line 218 to 233). Job id is
set based on cluster id. After the job is fixed, runApplicationAsync is called
with enforceSingleJobExecution=true. So multi-job execution is not viable.
If flink-kubernetes-operator always set job id when submitting application,
condition of situation #1 will never match. So application submitted with
flink-kubernetes-operator cannot execute multiple jobs, even if the application
is deployed in application mode and with HA turned off.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)