[
https://issues.apache.org/jira/browse/FLINK-36010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ahmed Hamdy updated FLINK-36010:
--------------------------------
Description:
h2. Description
Running a job on single execution mode with HA enabled typically [short
circuits duplicate execution of job after it reaches globally terminal
state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316].
However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global
job status is FAILED. This breaks the [consistency of
states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48]
h2. Reproducing steps
1- Create Flink configmap and service account as in
[https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe]
2- Submit Flink Job and TM deployments similar to
[https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml]
3- delete Taskmanager pods enforcing Job failover until restart count consumed,
The Job now reached Terminal state {{{}FAILED{}}}.
4- Batch Job is now restarted due to job manager failover, restarted but due to
single execution mode the job manager doesn't resubmit the job.
5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal
state (FAILED).
{code:java}
Printing result to stdout. Use --output to specify output path.
2024-08-08 12:32:50,110 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
2024-08-08 12:32:50,110 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
2024-08-08 12:32:50,118 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
JobGraph submission 'CarTopSpeedWindowingExample'
(3d98a3e455408045defa90c6b0b03a5a).
2024-08-08 12:32:50,164 WARN
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring
JobGraph submission 'CarTopSpeedWindowingExample'
(3d98a3e455408045defa90c6b0b03a5a) because the job already reached a
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous
execution.
2024-08-08 12:32:50,171 INFO
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
[] - Application completed SUCCESSFULLY
2024-08-08 12:32:50,171 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED.
Diagnostics null.
{code}
h3. Resource Highlights
{code:java}
flink-conf.yaml: |
job-result-store.delete-on-commit: false
high-availability.type: ZOOKEEPER
high-availability.jobmanager.port: 6125
high-availability.storageDir: s3a://flink-high-availability-dir
high-availability.zookeeper.quorum: my-release-zookeeper:2181
high-availability.zookeeper.path.root: /flink
jobmanager.execution.slot-allocation-timeout: 10000
{code}
{code:java}
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:1.20.0-scala_2.12
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["standalone-job","--host", "$(POD_IP)", "--job-classname",
"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id",
"3d98a3e455408045defa90c6b0b03a5a"]
{code}
h2. Expected Outcome
Instead of recording the Application state to SUCCEEDED on resubmission it
should be recorded as FAILED because the globally terminal state is FAILED
h3. Appendix: JobResultStore on HA
{code:java}
{"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException:
Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2,
backoffTimeMS=1).....{code}
was:
h2. Description
Running a job on single execution mode with HA enabled typically [short
circuits duplicate execution of job after it reaches globally terminal
state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316].
However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global
job status is FAILED. This breaks the [consistency of
states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48]
h2. Reproducing steps
1- Create Flink configmap and service account as in
[https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe]
2- Submit Flink Job and TM deployments similar to
[https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml]
3- delete Taskmanager pods enforcing Job failover until restart count consumed,
The Job now reached Terminal state {{FAILED}}.
4- Batch Job is now restarted due to job manager failover, restarted but due to
single execution mode the job manager doesn't resubmit the job.
5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal
state (FAILED).
{code}
Printing result to stdout. Use --output to specify output path.
2024-08-08 12:32:50,110 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
2024-08-08 12:32:50,110 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
2024-08-08 12:32:50,118 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
JobGraph submission 'CarTopSpeedWindowingExample'
(3d98a3e455408045defa90c6b0b03a5a).
2024-08-08 12:32:50,164 WARN
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring
JobGraph submission 'CarTopSpeedWindowingExample'
(3d98a3e455408045defa90c6b0b03a5a) because the job already reached a
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous
execution.
2024-08-08 12:32:50,171 INFO
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
[] - Application completed SUCCESSFULLY
2024-08-08 12:32:50,171 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED.
Diagnostics null.
{code}
h3. Resource Highlights
{code:java}
flink-conf.yaml: |
job-result-store.delete-on-commit: false
high-availability.type: ZOOKEEPER
high-availability.jobmanager.port: 6125
high-availability.storageDir: s3a://flink-high-availability-dir
high-availability.zookeeper.quorum: my-release-zookeeper:2181
high-availability.zookeeper.path.root: /flink
jobmanager.execution.slot-allocation-timeout: 10000
{code}
{code:java}
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:1.20.0-scala_2.12
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["standalone-job","--host", "$(POD_IP)", "--job-classname",
"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id",
"3d98a3e455408045defa90c6b0b03a5a"]
{code}
h2.Expected Outcome
Instead of recording the Application state to SUCCEEDED on resubmission it
should be recorded as FAILED because the globally terminal state is FAILED
> Duplicate Job Submission returns Succeeded on single execution even if Global
> state is FAILED
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-36010
> URL: https://issues.apache.org/jira/browse/FLINK-36010
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.8.3, 1.17.2, 1.20.0, 1.19.1
> Reporter: Ahmed Hamdy
> Priority: Major
> Fix For: 1.20.1
>
>
> h2. Description
> Running a job on single execution mode with HA enabled typically [short
> circuits duplicate execution of job after it reaches globally terminal
> state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316].
> However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global
> job status is FAILED. This breaks the [consistency of
> states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48]
> h2. Reproducing steps
> 1- Create Flink configmap and service account as in
> [https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe]
> 2- Submit Flink Job and TM deployments similar to
> [https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml]
> 3- delete Taskmanager pods enforcing Job failover until restart count
> consumed, The Job now reached Terminal state {{{}FAILED{}}}.
> 4- Batch Job is now restarted due to job manager failover, restarted but due
> to single execution mode the job manager doesn't resubmit the job.
> 5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal
> state (FAILED).
> {code:java}
> Printing result to stdout. Use --output to specify output path.
> 2024-08-08 12:32:50,110 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
> 2024-08-08 12:32:50,110 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
> 2024-08-08 12:32:50,118 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 'CarTopSpeedWindowingExample'
> (3d98a3e455408045defa90c6b0b03a5a).
> 2024-08-08 12:32:50,164 WARN
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring
> JobGraph submission 'CarTopSpeedWindowingExample'
> (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a
> globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous
> execution.
> 2024-08-08 12:32:50,171 INFO
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application completed SUCCESSFULLY
> 2024-08-08 12:32:50,171 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> StandaloneApplicationClusterEntryPoint down with application status
> SUCCEEDED. Diagnostics null.
> {code}
> h3. Resource Highlights
> {code:java}
> flink-conf.yaml: |
> job-result-store.delete-on-commit: false
> high-availability.type: ZOOKEEPER
> high-availability.jobmanager.port: 6125
> high-availability.storageDir: s3a://flink-high-availability-dir
> high-availability.zookeeper.quorum: my-release-zookeeper:2181
> high-availability.zookeeper.path.root: /flink
> jobmanager.execution.slot-allocation-timeout: 10000
> {code}
> {code:java}
> spec:
> restartPolicy: OnFailure
> containers:
> - name: jobmanager
> image: apache/flink:1.20.0-scala_2.12
> env:
> - name: POD_IP
> valueFrom:
> fieldRef:
> apiVersion: v1
> fieldPath: status.podIP
> args: ["standalone-job","--host", "$(POD_IP)", "--job-classname",
> "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing",
> "--job-id", "3d98a3e455408045defa90c6b0b03a5a"]
> {code}
>
> h2. Expected Outcome
> Instead of recording the Application state to SUCCEEDED on resubmission it
> should be recorded as FAILED because the globally terminal state is FAILED
> h3. Appendix: JobResultStore on HA
> {code:java}
> {"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException:
> Recovery is suppressed by
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2,
> backoffTimeMS=1).....{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)