[ 
https://issues.apache.org/jira/browse/FLINK-32340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergii Nazarov updated FLINK-32340:
-----------------------------------
    Description: 
Prerequisites:
 * Deployment via Apache Flink Kubernetes operator with version 1.5.0
 * Deployment using FlinkDeployment spec
 * Upgrade mode - savepoint
 * Configuration property 
"kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true
 * Flink version 1.15.4

 

Steps to reproduce:
 # Deploy an app
 # You can wait till the app creates a checkpoint (it doesn't change anything 
even if "kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true)
 # Deploy a new version of the app with an error that causes throwing an 
exception from the main method of the app

Exception which causes operator NPE
{code:none}
36mo.a.f.k.o.o.JobStatusObserver [m [1;31m[ERROR][flink-apps/myApp] Job 
0d78a62fe581b047510e28f26393a7ce failed with error: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Consumer does not exist
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
Source)
{code}
NPE in K8s operator
{code:none}
[36mo.a.f.k.o.l.AuditUtils        [m [32m[INFO ][flink-apps/myApp] >>> Event  | 
Info    | JOBSTATUSCHANGED | Job status changed from RECONCILING to FAILED
[36mo.a.f.k.o.o.SavepointObserver [m [1;31m[ERROR][flink-apps/myApp] Could not 
observe latest savepoint information.
java.lang.NullPointerException
        at 
org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper.getInProgressCheckpoint(CheckpointHistoryWrapper.java:60)
        at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getCheckpointInfo(AbstractFlinkService.java:564)
        at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:520)
        at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeLatestSavepoint(SavepointObserver.java:209)
        at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeSavepointStatus(SavepointObserver.java:73)
        at 
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:61)
        at 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:73)
        at 
org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:53)
        at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:134)
        at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:57)
        at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
        at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
        at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
        at 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
        at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
{code}
What it brakes
 # The operator loses the latest savepoint/checkpoint information so you cannot 
deploy a new version of the app with a fix. To workaround it you should 
entirely delete the current deployment, manually find the latest checkpoint by 
job ID in S3 or other checkpoint storages and create a new deployment with 
initialSavepointPath which you manually have found.
 # The operator stuck deleting the app when the app is deleted by the command

{noformat}
kubectl delete flinkdeployment <deployment name>{noformat}
To workaround it you need to find and delete the K8s Deployment resource
{noformat}
kubectl delete deployment <deployment name>{noformat}

  was:
Prerequisites:
 * Deployment via Apache Flink Kubernetes operator with version 1.5.0
 * Deployment using FlinkDeployment spec
 * Upgrade mode - savepoint
 * Configuration property 
"kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true

 

Steps to reproduce:
 # Deploy an app
 # You can wait till the app creates a checkpoint (it doesn't change anything 
even if "kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true)
 # Deploy a new version of the app with an error that causes throwing an 
exception from the main method of the app

Exception which causes operator NPE
{code:none}
36mo.a.f.k.o.o.JobStatusObserver [m [1;31m[ERROR][flink-apps/myApp] Job 
0d78a62fe581b047510e28f26393a7ce failed with error: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Consumer does not exist
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
Source)
{code}
NPE in K8s operator
{code:none}
[36mo.a.f.k.o.l.AuditUtils        [m [32m[INFO ][flink-apps/myApp] >>> Event  | 
Info    | JOBSTATUSCHANGED | Job status changed from RECONCILING to FAILED
[36mo.a.f.k.o.o.SavepointObserver [m [1;31m[ERROR][flink-apps/myApp] Could not 
observe latest savepoint information.
java.lang.NullPointerException
        at 
org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper.getInProgressCheckpoint(CheckpointHistoryWrapper.java:60)
        at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getCheckpointInfo(AbstractFlinkService.java:564)
        at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:520)
        at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeLatestSavepoint(SavepointObserver.java:209)
        at 
org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeSavepointStatus(SavepointObserver.java:73)
        at 
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:61)
        at 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:73)
        at 
org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:53)
        at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:134)
        at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:57)
        at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
        at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
        at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
        at 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
        at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
        at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
{code}
What it brakes
 # The operator loses the latest savepoint/checkpoint information so you cannot 
deploy a new version of the app with a fix. To workaround it you should 
entirely delete the current deployment, manually find the latest checkpoint by 
job ID in S3 or other checkpoint storages and create a new deployment with 
initialSavepointPath which you manually have found.
 # The operator stuck deleting the app when the app is deleted by the command 

{noformat}
kubectl delete flinkdeployment <deployment name>{noformat}
To workaround it you need to find and delete the K8s Deployment resource
{noformat}
kubectl delete deployment <deployment name>{noformat}


> NPE in K8s operator which brakes current and subsequent deployments
> -------------------------------------------------------------------
>
>                 Key: FLINK-32340
>                 URL: https://issues.apache.org/jira/browse/FLINK-32340
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.5.0
>            Reporter: Sergii Nazarov
>            Priority: Critical
>
> Prerequisites:
>  * Deployment via Apache Flink Kubernetes operator with version 1.5.0
>  * Deployment using FlinkDeployment spec
>  * Upgrade mode - savepoint
>  * Configuration property 
> "kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true
>  * Flink version 1.15.4
>  
> Steps to reproduce:
>  # Deploy an app
>  # You can wait till the app creates a checkpoint (it doesn't change anything 
> even if "kubernetes.operator.job.upgrade.last-state-fallback.enabled" is true)
>  # Deploy a new version of the app with an error that causes throwing an 
> exception from the main method of the app
> Exception which causes operator NPE
> {code:none}
> 36mo.a.f.k.o.o.JobStatusObserver [m [1;31m[ERROR][flink-apps/myApp] Job 
> 0d78a62fe581b047510e28f26393a7ce failed with error: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Consumer does not exist
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>       at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>       at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>       at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>       at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>       at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>       at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source)
> {code}
> NPE in K8s operator
> {code:none}
> [36mo.a.f.k.o.l.AuditUtils        [m [32m[INFO ][flink-apps/myApp] >>> Event  
> | Info    | JOBSTATUSCHANGED | Job status changed from RECONCILING to FAILED
> [36mo.a.f.k.o.o.SavepointObserver [m [1;31m[ERROR][flink-apps/myApp] Could 
> not observe latest savepoint information.
> java.lang.NullPointerException
>       at 
> org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper.getInProgressCheckpoint(CheckpointHistoryWrapper.java:60)
>       at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getCheckpointInfo(AbstractFlinkService.java:564)
>       at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:520)
>       at 
> org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeLatestSavepoint(SavepointObserver.java:209)
>       at 
> org.apache.flink.kubernetes.operator.observer.SavepointObserver.observeSavepointStatus(SavepointObserver.java:73)
>       at 
> org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:61)
>       at 
> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:73)
>       at 
> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:53)
>       at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:134)
>       at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:57)
>       at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>       at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>       at 
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>       at 
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>       at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>       at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>       at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>       at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>       at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {code}
> What it brakes
>  # The operator loses the latest savepoint/checkpoint information so you 
> cannot deploy a new version of the app with a fix. To workaround it you 
> should entirely delete the current deployment, manually find the latest 
> checkpoint by job ID in S3 or other checkpoint storages and create a new 
> deployment with initialSavepointPath which you manually have found.
>  # The operator stuck deleting the app when the app is deleted by the command
> {noformat}
> kubectl delete flinkdeployment <deployment name>{noformat}
> To workaround it you need to find and delete the K8s Deployment resource
> {noformat}
> kubectl delete deployment <deployment name>{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to