[ 
https://issues.apache.org/jira/browse/FLINK-30268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-30268:
-------------------------------
    Issue Type: Bug  (was: Improvement)

> HA metadata and other cluster submission related errors should not throw 
> DeploymentFailedException
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30268
>                 URL: https://issues.apache.org/jira/browse/FLINK-30268
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Gyula Fora
>            Assignee: Peter Vary
>            Priority: Blocker
>             Fix For: kubernetes-operator-1.3.0
>
>
> Currently most critical cluster submission errors , and checks that validate 
> HA metadata before deployment, end up throwing DeploymentFailedException.
> This causes the operator to go into a weird state and actually hide the error 
> in subsequent loops:
> {noformat}
> flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | 
> UPGRADING       | The resource is being upgraded 
> flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Info    | SUBMIT 
>          | Starting deployment
> flink-kubernetes-operator 2022-12-01 21:55:03,992 
> o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example] 
> Deploying application cluster requiring last-state from HA metadata
> flink-kubernetes-operator 2022-12-01 21:55:03,997 
> o.a.f.k.o.c.FlinkDeploymentController 
> [ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
> flink-kubernetes-operator 
> org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA 
> metadata not available to restore from last state. It is possible that the 
> job has finished or terminally failed, or the configmaps have been deleted. 
> Manual restore required.
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown 
> Source)
> flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | 
> RESTOREFAILED   | HA metadata not available to restore from last state. It is 
> possible that the job has finished or terminally failed, or the configmaps 
> have been deleted. Manual restore required.
> flink-kubernetes-operator 2022-12-01 21:55:04,034 
> o.a.f.k.o.c.FlinkDeploymentController [INFO 
> ][default/basic-checkpoint-ha-example] End of reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | 
> UPGRADING       | 
> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA
>  metadata not available to restore from last state. It is possible that the 
> job has finished or terminally failed, or the configmaps have been deleted. 
> Manual restore 
> required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]}
>  
> flink-kubernetes-operator 2022-12-01 21:55:19,056 
> o.a.f.k.o.c.FlinkDeploymentController [INFO 
> ][default/basic-checkpoint-ha-example] Starting reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:19,058 
> o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
> ][default/basic-checkpoint-ha-example] UPGRADE change(s) detected 
> (FlinkDeploymentSpec[job.state=RUNNING] differs from 
> FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
> flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | 
> UPGRADING       | The resource is being upgraded 
> flink-kubernetes-operator 2022-12-01 21:55:19,119 
> o.a.f.k.o.r.d.ApplicationReconciler 
> [ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment: 
> FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample,
>  jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING, 
> startTime=1669931677233, updateTime=1669931696153, 
> savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null, 
> triggerTimestamp=null, triggerType=null, formatType=null, 
> savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error=null), 
> clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @ 
> 2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR, 
> reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059,
>  
> lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}},
>  lastStableSpec=null, state=UPGRADING)), 
> taskManager=TaskManagerInfo(labelSelector=, replicas=0))
> flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | 
> CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
> flink-kubernetes-operator 2022-12-01 21:55:19,136 
> o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example] 
> Attempt count: 0, last attempt: false
> flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils      
>    [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | 
> UPGRADING       | 
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException:
>  This indicates a 
> bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This 
> indicates a bug..."}]} 
> flink-kubernetes-operator 2022-12-01 21:55:19,164 
> i.j.o.p.e.ReconciliationDispatcher 
> [ERROR][default/basic-checkpoint-ha-example] Error during event processing 
> ExecutionScope{ resource id: ResourceID{name='basic-checkpoint-ha-example', 
> namespace='default'}, version: 350553} failed.
> flink-kubernetes-operator 
> org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
> java.lang.RuntimeException: This indicates a bug...
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator     at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown 
> Source)
> flink-kubernetes-operator Caused by: java.lang.RuntimeException: This 
> indicates a bug...
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator     ... 13 more
> {noformat}
> The main cause here is that DeploymentFailedExceptions were originally 
> created so that the observer could signal a JobManager deployment failure 
> (after it was submitted). Thus the error handler logic in the controller 
> actually updates the jmDeploymentStatus and the job state which causes the 
> problem.
> To avoid this we should introduce a new Exception type or use something more 
> suitable. We should not touch touch the jobmanagerDeploymentStatus or the 
> jobstatus in most of these cases and simply retrigger the reconciliation. 
> This will keep the CR in an error loop triggering warnings etc but that is 
> expected in these critical failure scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to