Gyula Fora created FLINK-30268:
----------------------------------
Summary: HA metadata and other cluster submission related errors
should not throw DeploymentFailedException
Key: FLINK-30268
URL: https://issues.apache.org/jira/browse/FLINK-30268
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Peter Vary
Fix For: kubernetes-operator-1.3.0
Currently most critical cluster submission errors , and checks that validate HA
metadata before deployment, end up throwing DeploymentFailedException.
This causes the operator to go into a weird state and actually hide the error
in subsequent loops:
{noformat}
flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Status | Info | UPGRADING
| The resource is being upgraded
flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Event | Info | SUBMIT
| Starting deployment
flink-kubernetes-operator 2022-12-01 21:55:03,992
o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example]
Deploying application cluster requiring last-state from HA metadata
flink-kubernetes-operator 2022-12-01 21:55:03,997
o.a.f.k.o.c.FlinkDeploymentController
[ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
flink-kubernetes-operator
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA
metadata not available to restore from last state. It is possible that the job
has finished or terminally failed, or the configmaps have been deleted. Manual
restore required.
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
flink-kubernetes-operator at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
flink-kubernetes-operator at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown Source)
flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning |
RESTOREFAILED | HA metadata not available to restore from last state. It is
possible that the job has finished or terminally failed, or the configmaps have
been deleted. Manual restore required.
flink-kubernetes-operator 2022-12-01 21:55:04,034
o.a.f.k.o.c.FlinkDeploymentController [INFO
][default/basic-checkpoint-ha-example] End of reconciliation
flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Status | Error | UPGRADING
|
{"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA
metadata not available to restore from last state. It is possible that the job
has finished or terminally failed, or the configmaps have been deleted. Manual
restore
required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]}
flink-kubernetes-operator 2022-12-01 21:55:19,056
o.a.f.k.o.c.FlinkDeploymentController [INFO
][default/basic-checkpoint-ha-example] Starting reconciliation
flink-kubernetes-operator 2022-12-01 21:55:19,058
o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
][default/basic-checkpoint-ha-example] UPGRADE change(s) detected
(FlinkDeploymentSpec[job.state=RUNNING] differs from
FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Status | Info | UPGRADING
| The resource is being upgraded
flink-kubernetes-operator 2022-12-01 21:55:19,119
o.a.f.k.o.r.d.ApplicationReconciler
[ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment:
FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample,
jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING,
startTime=1669931677233, updateTime=1669931696153,
savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null,
triggerTimestamp=null, triggerType=null, formatType=null, savepointHistory=[],
lastPeriodicSavepointTimestamp=0)), error=null),
clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @
2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR,
reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059,
lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}},
lastStableSpec=null, state=UPGRADING)),
taskManager=TaskManagerInfo(labelSelector=, replicas=0))
flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning |
CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
flink-kubernetes-operator 2022-12-01 21:55:19,136
o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example]
Attempt count: 0, last attempt: false
flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils
[INFO ][default/basic-checkpoint-ha-example] >>> Status | Error | UPGRADING
|
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException:
This indicates a
bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This
indicates a bug..."}]}
flink-kubernetes-operator 2022-12-01 21:55:19,164
i.j.o.p.e.ReconciliationDispatcher [ERROR][default/basic-checkpoint-ha-example]
Error during event processing ExecutionScope{ resource id:
ResourceID{name='basic-checkpoint-ha-example', namespace='default'}, version:
350553} failed.
flink-kubernetes-operator
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
java.lang.RuntimeException: This indicates a bug...
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
flink-kubernetes-operator at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
flink-kubernetes-operator at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
flink-kubernetes-operator at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown Source)
flink-kubernetes-operator Caused by: java.lang.RuntimeException: This indicates
a bug...
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
flink-kubernetes-operator at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
flink-kubernetes-operator ... 13 more
{noformat}
The main cause here is that DeploymentFailedExceptions were originally created
so that the observer could signal a JobManager deployment failure (after it was
submitted). Thus the error handler logic in the controller actually updates the
jmDeploymentStatus and the job state which causes the problem.
To avoid this we should introduce a new Exception type or use something more
suitable. We should not touch touch the jobmanagerDeploymentStatus or the
jobstatus in most of these cases and simply retrigger the reconciliation. This
will keep the CR in an error loop triggering warnings etc but that is expected
in these critical failure scenarios.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)