This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a751c6960d89a065c8b652cd886d9bf3cb9510f2
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 15 21:33:49 2023 +0800

    flink cluster improvement
---
 .../streampark/common/enums/ClusterState.java      |  8 ----
 .../streampark/common/util/HadoopUtils.scala       |  6 ++-
 .../console/core/service/FlinkClusterService.java  |  4 --
 .../core/service/impl/FlinkClusterServiceImpl.java | 40 ++++++++++---------
 .../console/core/task/FlinkClusterWatcher.java     | 46 +++++++++-------------
 5 files changed, 44 insertions(+), 60 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 7794f7f95..2aef409c1 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -67,12 +67,4 @@ public enum ClusterState implements Serializable {
   public static boolean isRunning(ClusterState state) {
     return RUNNING.equals(state);
   }
-
-  public static boolean isFailed(ClusterState state) {
-    return state == ClusterState.FAILED
-        || state == ClusterState.LOST
-        || state == ClusterState.UNKNOWN
-        || state == ClusterState.KILLED
-        || state == ClusterState.CANCELED;
-  }
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 7616952b8..b9823d8f0 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.service.Service.STATE
-import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
@@ -290,6 +290,10 @@ object HadoopUtils extends Logger {
     new File(destPath.toString).getAbsolutePath
   }
 
+  def toYarnState(state: String): YarnApplicationState = {
+    YarnApplicationState.values.find(_.name() == state).orNull
+  }
+
   private class HadoopConfiguration extends Configuration {
 
     private lazy val rewriteNames = List(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 1c62806db..5f47c60a5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -37,12 +37,8 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
 
   void update(FlinkCluster flinkCluster);
 
-  void starting(FlinkCluster flinkCluster);
-
   void start(FlinkCluster flinkCluster);
 
-  void canceling(FlinkCluster flinkCluster);
-
   void shutdown(FlinkCluster flinkCluster);
 
   Boolean existsByClusterId(String clusterId, Long id);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index c5eba5b3f..f3dda6a56 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -155,18 +155,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     return ret;
   }
 
-  @Override
-  public void starting(FlinkCluster flinkCluster) {
-    flinkCluster.setClusterState(ClusterState.STARTING.getValue());
-    flinkCluster.setStartTime(new Date());
-    updateById(flinkCluster);
-  }
-
   @Override
   @Transactional(rollbackFor = {Exception.class})
   public void start(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
-    starting(flinkCluster);
+    updateClusterState(cluster.getId(), ClusterState.STARTING);
     try {
       DeployResponse deployResponse = deployInternal(flinkCluster);
       ApiAlertException.throwIfNull(
@@ -185,8 +178,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
       flinkCluster.setException(null);
       flinkCluster.setEndTime(null);
-      FlinkClusterWatcher.addWatching(flinkCluster);
       updateById(flinkCluster);
+      FlinkClusterWatcher.addWatching(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setClusterState(ClusterState.FAILED.getValue());
@@ -228,12 +221,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     updateById(flinkCluster);
   }
 
-  @Override
-  public void canceling(FlinkCluster flinkCluster) {
-    flinkCluster.setClusterState(ClusterState.CANCELING.getValue());
-    updateById(flinkCluster);
-  }
-
   @Override
   public void shutdown(FlinkCluster cluster) {
     FlinkCluster flinkCluster = this.getById(cluster.getId());
@@ -250,15 +237,15 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     ApiAlertException.throwIfTrue(
         existsRunningJob, "Some app is running on this cluster, the cluster 
cannot be shutdown");
 
-    canceling(flinkCluster);
+    updateClusterState(flinkCluster.getId(), ClusterState.CANCELING);
     try {
       // 4) shutdown
       ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, 
clusterId);
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
       flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
       flinkCluster.setEndTime(new Date());
-      FlinkClusterWatcher.unWatching(flinkCluster);
       updateById(flinkCluster);
+      FlinkClusterWatcher.unWatching(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setException(e.toString());
@@ -302,8 +289,23 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     LambdaUpdateWrapper<FlinkCluster> updateWrapper =
         new LambdaUpdateWrapper<FlinkCluster>()
             .eq(FlinkCluster::getId, id)
-            .set(FlinkCluster::getClusterState, state.getValue())
-            .set(FlinkCluster::getEndTime, new Date());
+            .set(FlinkCluster::getClusterState, state.getValue());
+
+    switch (state) {
+      case KILLED:
+      case UNKNOWN:
+      case LOST:
+      case FAILED:
+      case CANCELED:
+        updateWrapper.set(FlinkCluster::getEndTime, new Date());
+        break;
+      case STARTING:
+        updateWrapper.set(FlinkCluster::getStartTime, new Date());
+        break;
+      default:
+        break;
+    }
+
     update(updateWrapper);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index b6351fd7f..9b256742c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
@@ -111,10 +112,17 @@ public class FlinkClusterWatcher {
               EXECUTOR.execute(
                   () -> {
                     ClusterState state = getClusterState(flinkCluster);
-                    if (ClusterState.isFailed(state)) {
-                      
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
-                      unWatching(flinkCluster);
-                      alert(flinkCluster, state);
+                    switch (state) {
+                      case FAILED:
+                      case LOST:
+                      case UNKNOWN:
+                      case KILLED:
+                        
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
+                        unWatching(flinkCluster);
+                        alert(flinkCluster, state);
+                        break;
+                      default:
+                        break;
                     }
                   }));
     }
@@ -177,8 +185,8 @@ public class FlinkClusterWatcher {
    * @return
    */
   private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
-    final ClusterState state = getStateFromFlinkRestApi(flinkCluster);
-    if (ClusterState.isFailed(state)) {
+    ClusterState state = getStateFromFlinkRestApi(flinkCluster);
+    if (ClusterState.LOST == state) {
       return getStateFromYarnRestApi(flinkCluster);
     }
     return state;
@@ -191,12 +199,9 @@ public class FlinkClusterWatcher {
    * @return
    */
   private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
-    final String address = flinkCluster.getAddress();
-    if (StringUtils.isEmpty(address)) {
-      return ClusterState.CANCELED;
-    }
-    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
-    final String flinkUrl =
+    String address = flinkCluster.getAddress();
+    String jobManagerUrl = flinkCluster.getJobManagerUrl();
+    String flinkUrl =
         StringUtils.isEmpty(jobManagerUrl)
             ? address.concat("/overview")
             : jobManagerUrl.concat("/overview");
@@ -227,7 +232,7 @@ public class FlinkClusterWatcher {
         return ClusterState.UNKNOWN;
       }
       YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
-      YarnApplicationState status = 
stringConvertYarnState(yarnAppInfo.getApp().getState());
+      YarnApplicationState status = 
HadoopUtils.toYarnState(yarnAppInfo.getApp().getState());
       if (status == null) {
         log.error(
             "cluster id:{} final application status convert failed, invalid 
string ",
@@ -260,21 +265,6 @@ public class FlinkClusterWatcher {
     }
   }
 
-  /**
-   * string converse yarn application state
-   *
-   * @param value
-   * @return
-   */
-  private YarnApplicationState stringConvertYarnState(String value) {
-    for (YarnApplicationState state : YarnApplicationState.values()) {
-      if (state.name().equals(value)) {
-        return state;
-      }
-    }
-    return null;
-  }
-
   /**
    * yarn application state convert cluster state
    *

Reply via email to