Fabio Wanner created FLINK-32592:
------------------------------------
Summary: Mixed-up job execution on concurrent job submission
Key: FLINK-32592
URL: https://issues.apache.org/jira/browse/FLINK-32592
Project: Flink
Issue Type: Bug
Components: Client / Job Submission
Affects Versions: 1.17.1, 1.15.4, 1.18.0
Reporter: Fabio Wanner
*Context*
We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a
single session cluster. The job submissions done by the operator happen
concurrently, basically at the same time.
Operator version: 1.5.0
Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf)
*Problem*
Rarely (~once every 50 deployments) one of the jobs will not be executed. In
the following incident 4 jobs are deployed at the same time:
* gorner-task-staging-e5730831
* gorner-facility-staging-e5730831
* gorner-aepp-staging-e5730831
* gorner-session-staging-e5730831
The operator submits the job, they all get a reasonable jobID:
{code:java}
2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO
][aelps-staging/gorner-task-staging-e5730831] Submitting job:
4968b186061e44390000000000000002 to session cluster.
2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO
][aelps-staging/gorner-facility-staging-e5730831] Submitting job:
91a5260d916c4dff0000000000000002 to session cluster.
2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO
][aelps-staging/gorner-aepp-staging-e5730831] Submitting job:
103c0446e14749a10000000000000002 to session cluster.
2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO
][aelps-staging/gorner-session-staging-e5730831] Submitting job:
de59304d370b4b8e0000000000000002 to session cluster.
{code}
In the cluster the JarRunHandler's handleRequest() method will get the request,
all 4 jobIDs are present (also all args, etc are correct):
{code:java}
2023-07-14 10:25:35,320 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest
- requestBody.jobId: 4968b186061e44390000000000000002
2023-07-14 10:25:35,321 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest
- requestBody.jobId: de59304d370b4b8e0000000000000002
2023-07-14 10:25:35,321 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest
- requestBody.jobId: 91a5260d916c4dff0000000000000002
2023-07-14 10:25:35,321 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest
- requestBody.jobId: 103c0446e14749a10000000000000002
{code}
But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called
instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice:
{code:java}
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - optJobId: Optional[4968b186061e44390000000000000002]
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - optJobId: Optional[103c0446e14749a10000000000000002]
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
2023-07-14 10:25:35,721 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
{code}
If this is important: the jobGraph obtained does not match the jobID. We get 2
times de59304d370b4b8e0000000000000002 but the jobgraph for this jobID is never
returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture().
This will then lead to the job already existing:
{code:java}
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - submittedJobIds: []
2023-07-14 10:25:35,721 WARN
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
execute - submittedJobIds: [de59304d370b4b8e0000000000000002]
{code}
But since the jobs are completely different the execution will fail. Depending
on the timing with one of the following exceptions:
* RestHandlerException: No jobs included in application
* ClassNotFoundException:
io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor
--
This message was sent by Atlassian Jira
(v8.20.10#820010)