This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new fcfa54bed Fix the issue where the state sometimes remains as 
"canceled" continuously. (#4031)
fcfa54bed is described below

commit fcfa54bed14ea92c27b64a744898a00af0175c79
Author: Darcy <[email protected]>
AuthorDate: Wed Sep 4 18:23:21 2024 +0800

    Fix the issue where the state sometimes remains as "canceled" continuously. 
(#4031)
    
    * Fix the issue where the state sometimes remains as "canceled" 
continuously.
    
    * Remove the watchController.trackIds.invalidate statement; this method 
should be called in the unwatch function.
---
 .../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala  | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index ec0aa6a82..aa82962f3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -305,21 +305,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    */
   private def inferStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
       pollEmitTime: Long): Option[JobStatusCV] = {
-
     // infer from k8s deployment and event
     val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+    // whether deployment exists on kubernetes cluster
+    val deployExists = KubernetesRetriever.isDeploymentExists(
+      trackId.namespace,
+      trackId.clusterId
+    )
     val jobState = trackId match {
       case id if watchController.canceling.has(id) =>
         logger.info(s"trackId ${trackId.toString} is canceling")
-        watchController.trackIds.invalidate(id)
-        FlinkJobState.CANCELED
+        if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
       case _ =>
-        // whether deployment exists on kubernetes cluster
-        val deployExists = KubernetesRetriever.isDeploymentExists(
-          trackId.namespace,
-          trackId.clusterId
-        )
-
         val isConnection = KubernetesDeploymentHelper.checkConnection()
 
         if (deployExists) {

Reply via email to