[
https://issues.apache.org/jira/browse/FLINK-30268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gyula Fora updated FLINK-30268:
-------------------------------
Issue Type: Bug (was: Improvement)
> HA metadata and other cluster submission related errors should not throw
> DeploymentFailedException
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-30268
> URL: https://issues.apache.org/jira/browse/FLINK-30268
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Reporter: Gyula Fora
> Assignee: Peter Vary
> Priority: Blocker
> Fix For: kubernetes-operator-1.3.0
>
>
> Currently most critical cluster submission errors , and checks that validate
> HA metadata before deployment, end up throwing DeploymentFailedException.
> This causes the operator to go into a weird state and actually hide the error
> in subsequent loops:
> {noformat}
> flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info |
> UPGRADING | The resource is being upgraded
> flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Event | Info | SUBMIT
> | Starting deployment
> flink-kubernetes-operator 2022-12-01 21:55:03,992
> o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example]
> Deploying application cluster requiring last-state from HA metadata
> flink-kubernetes-operator 2022-12-01 21:55:03,997
> o.a.f.k.o.c.FlinkDeploymentController
> [ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
> flink-kubernetes-operator
> org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA
> metadata not available to restore from last state. It is possible that the
> job has finished or terminally failed, or the configmaps have been deleted.
> Manual restore required.
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown
> Source)
> flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning |
> RESTOREFAILED | HA metadata not available to restore from last state. It is
> possible that the job has finished or terminally failed, or the configmaps
> have been deleted. Manual restore required.
> flink-kubernetes-operator 2022-12-01 21:55:04,034
> o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][default/basic-checkpoint-ha-example] End of reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error |
> UPGRADING |
> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA
> metadata not available to restore from last state. It is possible that the
> job has finished or terminally failed, or the configmaps have been deleted.
> Manual restore
> required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]}
>
> flink-kubernetes-operator 2022-12-01 21:55:19,056
> o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][default/basic-checkpoint-ha-example] Starting reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:19,058
> o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
> ][default/basic-checkpoint-ha-example] UPGRADE change(s) detected
> (FlinkDeploymentSpec[job.state=RUNNING] differs from
> FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
> flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info |
> UPGRADING | The resource is being upgraded
> flink-kubernetes-operator 2022-12-01 21:55:19,119
> o.a.f.k.o.r.d.ApplicationReconciler
> [ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment:
> FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample,
> jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING,
> startTime=1669931677233, updateTime=1669931696153,
> savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null,
> triggerTimestamp=null, triggerType=null, formatType=null,
> savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error=null),
> clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @
> 2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR,
> reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059,
>
> lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}},
> lastStableSpec=null, state=UPGRADING)),
> taskManager=TaskManagerInfo(labelSelector=, replicas=0))
> flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning |
> CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
> flink-kubernetes-operator 2022-12-01 21:55:19,136
> o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example]
> Attempt count: 0, last attempt: false
> flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils
> [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error |
> UPGRADING |
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException:
> This indicates a
> bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This
> indicates a bug..."}]}
> flink-kubernetes-operator 2022-12-01 21:55:19,164
> i.j.o.p.e.ReconciliationDispatcher
> [ERROR][default/basic-checkpoint-ha-example] Error during event processing
> ExecutionScope{ resource id: ResourceID{name='basic-checkpoint-ha-example',
> namespace='default'}, version: 350553} failed.
> flink-kubernetes-operator
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.lang.RuntimeException: This indicates a bug...
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown
> Source)
> flink-kubernetes-operator Caused by: java.lang.RuntimeException: This
> indicates a bug...
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator ... 13 more
> {noformat}
> The main cause here is that DeploymentFailedExceptions were originally
> created so that the observer could signal a JobManager deployment failure
> (after it was submitted). Thus the error handler logic in the controller
> actually updates the jmDeploymentStatus and the job state which causes the
> problem.
> To avoid this we should introduce a new Exception type or use something more
> suitable. We should not touch touch the jobmanagerDeploymentStatus or the
> jobstatus in most of these cases and simply retrigger the reconciliation.
> This will keep the CR in an error loop triggering warnings etc but that is
> expected in these critical failure scenarios.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)