[
https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745668#comment-17745668
]
Fabio Wanner commented on FLINK-32592:
--------------------------------------
We did a few hundred deployments and did not run into the issue anymore! Looks
resolved! Thanks a lot!
> (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
> ------------------------------------------------------------
>
> Key: FLINK-32592
> URL: https://issues.apache.org/jira/browse/FLINK-32592
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.15.4, 1.18.0, 1.17.1
> Reporter: Fabio Wanner
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> *Context*
> We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a
> single session cluster. The job submissions done by the operator happen
> concurrently, basically at the same time.
> Operator version: 1.5.0
> Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf)
> *Problem*
> Rarely (~once every 50 deployments) one of the jobs will not be executed. In
> the following incident 4 jobs are deployed at the same time:
> * gorner-task-staging-e5730831
> * gorner-facility-staging-e5730831
> * gorner-aepp-staging-e5730831
> * gorner-session-staging-e5730831
>
> The operator submits the job, they all get a reasonable jobID:
> {code:java}
> 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][aelps-staging/gorner-task-staging-e5730831] Submitting job:
> 4968b186061e44390000000000000002 to session cluster.
> 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][aelps-staging/gorner-facility-staging-e5730831] Submitting job:
> 91a5260d916c4dff0000000000000002 to session cluster.
> 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job:
> 103c0446e14749a10000000000000002 to session cluster.
> 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][aelps-staging/gorner-session-staging-e5730831] Submitting job:
> de59304d370b4b8e0000000000000002 to session cluster.
> {code}
> In the cluster the JarRunHandler's handleRequest() method will get the
> request, all 4 jobIDs are present (also all args, etc are correct):
> {code:java}
> 2023-07-14 10:25:35,320 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> handleRequest - requestBody.jobId: 4968b186061e44390000000000000002
> 2023-07-14 10:25:35,321 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> handleRequest - requestBody.jobId: de59304d370b4b8e0000000000000002
> 2023-07-14 10:25:35,321 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> handleRequest - requestBody.jobId: 91a5260d916c4dff0000000000000002
> 2023-07-14 10:25:35,321 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> handleRequest - requestBody.jobId: 103c0446e14749a10000000000000002
> {code}
> But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is
> called instead of getting 1 call per jobID we have 4 calls but one of the
> jobIDs twice:
> {code:java}
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - optJobId: Optional[4968b186061e44390000000000000002]
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - optJobId: Optional[103c0446e14749a10000000000000002]
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
> 2023-07-14 10:25:35,721 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
> {code}
> If this is important: the jobGraph obtained does not match the jobID. We get
> 2 times de59304d370b4b8e0000000000000002 but the jobgraph for this jobID is
> never returned by getJobGraph() in
> EmbeddedExecutor.submitAndGetJobClientFuture().
> This will then lead to the job already existing:
> {code:java}
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,721 WARN
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - execute - submittedJobIds: [de59304d370b4b8e0000000000000002]
> {code}
> But since the jobs are completely different the execution will fail.
> Depending on the timing with one of the following exceptions:
> * RestHandlerException: No jobs included in application
> * ClassNotFoundException:
> io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)