This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4b9d8817d [ISSUE-3129][Improve] Unified enumeration comparison methods 
(#3135)
4b9d8817d is described below

commit 4b9d8817d9a7044e38523c6acb74b54c609bc7d8
Author: xxt <[email protected]>
AuthorDate: Tue Sep 19 02:05:11 2023 +0800

    [ISSUE-3129][Improve] Unified enumeration comparison methods (#3135)
    
    * [ISSUE-3129][Improve] Unified enumeration comparison methods #3129
    
    
    ---------
    
    Co-authored-by: xujiawei <[email protected]>
---
 .../streampark/common/enums/ExecutionMode.java     | 16 +++----
 .../console/core/entity/Application.java           |  2 +-
 .../console/core/entity/FlinkCluster.java          |  2 +-
 .../console/core/enums/FlinkAppState.java          |  2 +-
 .../console/core/enums/GitCredential.java          |  2 +-
 .../console/core/runner/EnvInitializer.java        |  2 +-
 .../impl/ApplicationActionServiceImpl.java         | 12 +++---
 .../impl/ApplicationInfoServiceImpl.java           |  6 +--
 .../impl/ApplicationManageServiceImpl.java         |  4 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  4 +-
 .../service/impl/ApplicationConfigServiceImpl.java |  2 +-
 .../core/service/impl/ProjectServiceImpl.java      |  4 +-
 .../core/service/impl/ResourceServiceImpl.java     |  2 +-
 .../core/service/impl/SavePointServiceImpl.java    |  5 +--
 .../console/core/task/FlinkAppHttpWatcher.java     | 49 +++++++++++-----------
 .../core/task/FlinkCheckpointProcessor.java        |  6 +--
 .../core/task/FlinkK8sChangeEventListener.java     | 10 ++---
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  4 +-
 .../system/service/impl/MenuServiceImpl.java       |  4 +-
 .../system/service/impl/TeamServiceImpl.java       |  2 +-
 .../flink/client/impl/YarnSessionClient.scala      |  2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |  4 +-
 22 files changed, 71 insertions(+), 75 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
index 9ce12ea28..889cb915d 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
@@ -74,17 +74,17 @@ public enum ExecutionMode {
   }
 
   public static boolean isYarnMode(ExecutionMode mode) {
-    return YARN_PER_JOB.equals(mode) || YARN_APPLICATION.equals(mode) || 
YARN_SESSION.equals(mode);
+    return YARN_PER_JOB == mode || YARN_APPLICATION == mode || YARN_SESSION == 
mode;
   }
 
   // TODO: We'll inline this method back to the corresponding caller lines
   //  after dropping the yarn perjob mode.
   public static boolean isYarnPerJobOrAppMode(ExecutionMode mode) {
-    return YARN_PER_JOB.equals(mode) || YARN_APPLICATION.equals(mode);
+    return YARN_PER_JOB == mode || YARN_APPLICATION == mode;
   }
 
   public static boolean isYarnSessionMode(ExecutionMode mode) {
-    return YARN_SESSION.equals(mode);
+    return YARN_SESSION == mode;
   }
 
   public static boolean isYarnMode(Integer value) {
@@ -92,11 +92,11 @@ public enum ExecutionMode {
   }
 
   public static boolean isKubernetesSessionMode(Integer value) {
-    return KUBERNETES_NATIVE_SESSION.equals(of(value));
+    return KUBERNETES_NATIVE_SESSION == of(value);
   }
 
   public static boolean isKubernetesMode(ExecutionMode mode) {
-    return KUBERNETES_NATIVE_SESSION.equals(mode) || 
KUBERNETES_NATIVE_APPLICATION.equals(mode);
+    return KUBERNETES_NATIVE_SESSION == mode || KUBERNETES_NATIVE_APPLICATION 
== mode;
   }
 
   public static boolean isKubernetesMode(Integer value) {
@@ -104,7 +104,7 @@ public enum ExecutionMode {
   }
 
   public static boolean isKubernetesApplicationMode(Integer value) {
-    return KUBERNETES_NATIVE_APPLICATION.equals(of(value));
+    return KUBERNETES_NATIVE_APPLICATION == of(value);
   }
 
   public static List<Integer> getKubernetesMode() {
@@ -113,7 +113,7 @@ public enum ExecutionMode {
   }
 
   public static boolean isSessionMode(ExecutionMode mode) {
-    return KUBERNETES_NATIVE_SESSION.equals(mode) || YARN_SESSION.equals(mode);
+    return KUBERNETES_NATIVE_SESSION == mode || YARN_SESSION == mode;
   }
 
   public static boolean isRemoteMode(Integer value) {
@@ -121,6 +121,6 @@ public enum ExecutionMode {
   }
 
   public static boolean isRemoteMode(ExecutionMode mode) {
-    return REMOTE.equals(mode);
+    return REMOTE == mode;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 98fd8dd53..0c1593662 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -569,7 +569,7 @@ public class Application implements Serializable {
   }
 
   private boolean needFillYarnQueueLabel(ExecutionMode mode) {
-    return ExecutionMode.YARN_PER_JOB.equals(mode) || 
ExecutionMode.YARN_APPLICATION.equals(mode);
+    return ExecutionMode.YARN_PER_JOB == mode || 
ExecutionMode.YARN_APPLICATION == mode;
   }
 
   @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 390607476..6243d6f55 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -136,7 +136,7 @@ public class FlinkCluster implements Serializable {
       return Collections.emptyMap();
     }
     Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
-    if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
+    if (ExecutionMode.YARN_SESSION == getExecutionModeEnum()) {
       map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
       map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue));
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 59dc4c6f5..4875d5969 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -153,7 +153,7 @@ public enum FlinkAppState {
   public static class Bridge {
     /** covert from org.apache.streampark.flink.k8s.enums.FlinkJobState */
     public static FlinkAppState fromK8sFlinkJobState(Enumeration.Value 
flinkJobState) {
-      if (FlinkJobState.K8S_INITIALIZING().equals(flinkJobState)) {
+      if (FlinkJobState.K8S_INITIALIZING() == flinkJobState) {
         return INITIALIZING;
       } else {
         return of(flinkJobState.toString());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
index 36a19a90f..748077ff3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
@@ -34,7 +34,7 @@ public enum GitCredential {
   }
 
   public static boolean isSSH(Integer gitCredential) {
-    return GitCredential.SSH.equals(GitCredential.of(gitCredential));
+    return GitCredential.SSH == GitCredential.of(gitCredential);
   }
 
   public Integer getValue() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index f854eb6a4..c4cb68a27 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -136,7 +136,7 @@ public class EnvInitializer implements ApplicationRunner {
     Workspace workspace = Workspace.of(storageType);
 
     // 1. prepare workspace dir
-    if (storageType.equals(LFS)) {
+    if (LFS == storageType) {
       fsOperator.mkdirsIfNotExists(Workspace.APP_LOCAL_DIST());
     }
     Arrays.asList(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 728f3b7f4..c17a9fd51 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -268,7 +268,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
       clusterId = application.getClusterId();
     } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+      if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
         FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
         ApiAlertException.throwIfNull(
             cluster,
@@ -438,7 +438,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     String appConf = userJarAndAppConf.f1;
 
     BuildResult buildResult = buildPipeline.getBuildResult();
-    if 
(ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
+    if (ExecutionMode.YARN_APPLICATION == application.getExecutionModeEnum()) {
       buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
     }
 
@@ -591,7 +591,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
                 ? null
                 : String.format("yaml://%s", applicationConfig.getContent());
         // 3) client
-        if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+        if (ExecutionMode.YARN_APPLICATION == executionMode) {
           String clientPath = Workspace.remote().APP_CLIENT();
           flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
         }
@@ -624,7 +624,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
           switch (application.getApplicationType()) {
             case STREAMPARK_FLINK:
               ConfigFileType fileType = 
ConfigFileType.of(applicationConfig.getFormat());
-              if (fileType != null && 
!fileType.equals(ConfigFileType.UNKNOWN)) {
+              if (fileType != null && ConfigFileType.UNKNOWN != fileType) {
                 appConf =
                     String.format(
                         "%s://%s", fileType.getTypeName(), 
applicationConfig.getContent());
@@ -645,7 +645,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
           }
         }
 
-        if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+        if (ExecutionMode.YARN_APPLICATION == executionMode) {
           switch (application.getApplicationType()) {
             case STREAMPARK_FLINK:
               flinkUserJar =
@@ -690,7 +690,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
       properties.put(RestOptions.PORT.key(), activeAddress.getPort());
     } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
-      if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+      if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
         FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
         ApiAlertException.throwIfNull(
             cluster,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index ca7ad1e04..c4e1b27d3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -202,8 +202,8 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
       envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv);
       envInitializer.storageInitialize(application.getStorageType());
 
-      if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
-          || ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+      if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()
+          || ExecutionMode.REMOTE == application.getExecutionModeEnum()) {
         FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
         boolean conned = 
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
         if (!conned) {
@@ -247,7 +247,7 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
             .anyMatch(
                 application ->
                     clusterId.equals(application.getFlinkClusterId())
-                        && 
FlinkAppState.RUNNING.equals(application.getStateEnum()));
+                        && FlinkAppState.RUNNING == 
application.getStateEnum());
   }
 
   @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2dc38afb2..4a46462ff 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -497,8 +497,8 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     // 4) yarn application mode change
     if (!application.getBuild()) {
       if (!application.getExecutionMode().equals(appParam.getExecutionMode())) 
{
-        if 
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
-            || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+        if (ExecutionMode.YARN_APPLICATION == appParam.getExecutionModeEnum()
+            || ExecutionMode.YARN_APPLICATION == 
application.getExecutionModeEnum()) {
           application.setBuild(true);
         }
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 65d6650e2..4e0bfa483 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -443,8 +443,8 @@ public class AppBuildPipeServiceImpl
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
         String localWorkspace = app.getLocalAppHome().concat("/lib");
-        if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
-            && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
+        if (DevelopmentMode.CUSTOM_CODE == app.getDevelopmentMode()
+            && ApplicationType.APACHE_FLINK == app.getApplicationType()) {
           yarnProvidedPath = app.getAppHome();
           localWorkspace = app.getLocalAppHome();
         }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 5b560fc29..56f778b1c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -74,7 +74,7 @@ public class ApplicationConfigServiceImpl
 
     if (application.getFormat() != null) {
       ConfigFileType fileType = ConfigFileType.of(application.getFormat());
-      if (fileType == null || ConfigFileType.UNKNOWN.equals(fileType)) {
+      if (fileType == null || ConfigFileType.UNKNOWN == fileType) {
         throw new ApiAlertException(
             "application' config error. must be (.properties|.yaml|.yml 
|.conf)");
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 254818045..416e162b8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -141,7 +141,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
     }
     if (projectParam.getBuildState() != null) {
       project.setBuildState(projectParam.getBuildState());
-      if 
(BuildState.of(projectParam.getBuildState()).equals(BuildState.NEED_REBUILD)) {
+      if (BuildState.NEED_REBUILD == 
BuildState.of(projectParam.getBuildState())) {
         List<Application> applications = getApplications(project);
         // Update deployment status
         applications.forEach(
@@ -235,7 +235,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
     Project project = getById(id);
     Utils.notNull(project);
 
-    if (!BuildState.SUCCESSFUL.equals(BuildState.of(project.getBuildState()))
+    if (BuildState.SUCCESSFUL != BuildState.of(project.getBuildState())
         || !project.getDistHome().exists()) {
       return Collections.emptyList();
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 24ed747c8..c5e35c8b6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -304,7 +304,7 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
         FlinkConnector connectorResource;
 
         ApiAlertException.throwIfFalse(
-            ResourceType.CONNECTOR.equals(resourceParam.getResourceType()),
+            ResourceType.CONNECTOR == resourceParam.getResourceType(),
             "getConnectorId method error, resource not flink connector.");
 
         List<File> jars;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 16d09a246..67d609fe3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -306,7 +306,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
       return application.getClusterId();
     } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+      if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
         Utils.notNull(
             cluster,
             String.format(
@@ -445,8 +445,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     int cpThreshold =
         
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
             .orElse(getChkNumRetainedFromFlinkEnv(flinkEnv, application));
-    cpThreshold =
-        CHECKPOINT.equals(CheckPointType.of(entity.getType())) ? cpThreshold - 
1 : cpThreshold;
+    cpThreshold = CHECKPOINT == CheckPointType.of(entity.getType()) ? 
cpThreshold - 1 : cpThreshold;
 
     if (cpThreshold == 0) {
       LambdaQueryWrapper<SavePoint> queryWrapper =
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index ce60a1919..4bfb75127 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -230,12 +230,12 @@ public class FlinkAppHttpWatcher {
                Query from flink's restAPI and yarn's restAPI both failed.
                In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
               */
-              if (optionState == null || 
!optionState.equals(OptionState.STARTING)) {
+              if (optionState == null || OptionState.STARTING != optionState) {
                 // non-mapping
                 if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
                   log.error(
                       "FlinkAppHttpWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
-                  if (StopFrom.NONE.equals(stopFrom)) {
+                  if (StopFrom.NONE == stopFrom) {
                     savePointService.expire(application.getId());
                     application.setState(FlinkAppState.LOST.getValue());
                     doAlert(application, FlinkAppState.LOST);
@@ -253,9 +253,9 @@ public class FlinkAppHttpWatcher {
                 cleanOptioning(optionState, id);
                 doPersistMetrics(application, true);
                 FlinkAppState appState = application.getStateEnum();
-                if (appState == FlinkAppState.FAILED || appState == 
FlinkAppState.LOST) {
+                if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == 
appState) {
                   doAlert(application, application.getStateEnum());
-                  if (appState.equals(FlinkAppState.FAILED)) {
+                  if (FlinkAppState.FAILED == appState) {
                     try {
                       applicationActionService.start(application, true);
                     } catch (Exception e) {
@@ -280,8 +280,7 @@ public class FlinkAppHttpWatcher {
     JobsOverview jobsOverview = httpJobsOverview(application);
     Optional<JobsOverview.Job> optional;
     ExecutionMode execMode = application.getExecutionModeEnum();
-    if (ExecutionMode.YARN_APPLICATION.equals(execMode)
-        || ExecutionMode.YARN_PER_JOB.equals(execMode)) {
+    if (ExecutionMode.YARN_APPLICATION == execMode || 
ExecutionMode.YARN_PER_JOB == execMode) {
       optional =
           jobsOverview.getJobs().size() > 1
               ? jobsOverview.getJobs().stream()
@@ -299,7 +298,7 @@ public class FlinkAppHttpWatcher {
       JobsOverview.Job jobOverview = optional.get();
       FlinkAppState currentState = FlinkAppState.of(jobOverview.getState());
 
-      if (!FlinkAppState.OTHER.equals(currentState)) {
+      if (FlinkAppState.OTHER != currentState) {
         try {
           // 1) set info from JobOverview
           handleJobOverview(application, jobOverview);
@@ -314,7 +313,7 @@ public class FlinkAppHttpWatcher {
         }
         // 3) savePoint obsolete check and NEED_START check
         OptionState optionState = OPTIONING.get(application.getId());
-        if (currentState.equals(FlinkAppState.RUNNING)) {
+        if (FlinkAppState.RUNNING == currentState) {
           handleRunningState(application, optionState, currentState);
         } else {
           handleNotRunState(application, optionState, currentState, stopFrom);
@@ -386,7 +385,7 @@ public class FlinkAppHttpWatcher {
      NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
      NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
     */
-    if (OptionState.STARTING.equals(optionState)) {
+    if (OptionState.STARTING == optionState) {
       Application latestApp = WATCHING_APPS.get(application.getId());
       ReleaseState releaseState = latestApp.getReleaseState();
       switch (releaseState) {
@@ -460,8 +459,8 @@ public class FlinkAppHttpWatcher {
             currentState.name());
         cleanSavepoint(application);
         application.setState(currentState.getValue());
-        if (StopFrom.NONE.equals(stopFrom) || 
applicationInfoService.checkAlter(application)) {
-          if (StopFrom.NONE.equals(stopFrom)) {
+        if (StopFrom.NONE == stopFrom || 
applicationInfoService.checkAlter(application)) {
+          if (StopFrom.NONE == stopFrom) {
             log.info(
                 "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not 
form StreamPark,savePoint expired!");
             savePointService.expire(application.getId());
@@ -512,7 +511,7 @@ public class FlinkAppHttpWatcher {
     Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
     if (flag != null) {
       log.info("FlinkAppHttpWatcher previous state: canceling.");
-      if (StopFrom.NONE.equals(stopFrom)) {
+      if (StopFrom.NONE == stopFrom) {
         log.error(
             "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
         savePointService.expire(application.getId());
@@ -525,18 +524,18 @@ public class FlinkAppHttpWatcher {
       // query the status from the yarn rest Api
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
       if (yarnAppInfo == null) {
-        if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+        if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
           throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
failed ");
         }
       } else {
         try {
           String state = yarnAppInfo.getApp().getFinalStatus();
           FlinkAppState flinkAppState = FlinkAppState.of(state);
-          if (FlinkAppState.OTHER.equals(flinkAppState)) {
+          if (FlinkAppState.OTHER == flinkAppState) {
             return;
           }
-          if (FlinkAppState.KILLED.equals(flinkAppState)) {
-            if (StopFrom.NONE.equals(stopFrom)) {
+          if (FlinkAppState.KILLED == flinkAppState) {
+            if (StopFrom.NONE == stopFrom) {
               log.error(
                   "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
               savePointService.expire(application.getId());
@@ -545,24 +544,24 @@ public class FlinkAppHttpWatcher {
             cleanSavepoint(application);
             application.setEndTime(new Date());
           }
-          if (FlinkAppState.SUCCEEDED.equals(flinkAppState)) {
+          if (FlinkAppState.SUCCEEDED == flinkAppState) {
             flinkAppState = FlinkAppState.FINISHED;
           }
           application.setState(flinkAppState.getValue());
           cleanOptioning(optionState, application.getId());
           doPersistMetrics(application, true);
-          if (flinkAppState.equals(FlinkAppState.FAILED)
-              || flinkAppState.equals(FlinkAppState.LOST)
-              || (flinkAppState.equals(FlinkAppState.CANCELED) && 
StopFrom.NONE.equals(stopFrom))
+          if (FlinkAppState.FAILED == flinkAppState
+              || FlinkAppState.LOST == flinkAppState
+              || (FlinkAppState.CANCELED == flinkAppState && StopFrom.NONE == 
stopFrom)
               || applicationInfoService.checkAlter(application)) {
             doAlert(application, flinkAppState);
             stopCanceledJob(application.getId());
-            if (flinkAppState.equals(FlinkAppState.FAILED)) {
+            if (FlinkAppState.FAILED == flinkAppState) {
               applicationActionService.start(application, true);
             }
           }
         } catch (Exception e) {
-          if 
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+          if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
             throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
error,", e);
           }
         }
@@ -589,7 +588,7 @@ public class FlinkAppHttpWatcher {
     }
     log.info("FlinkAppHttpWatcher setOptioning");
     OPTIONING.put(appId, state);
-    if (state.equals(OptionState.CANCELLING)) {
+    if (OptionState.CANCELLING == state) {
       STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
     }
   }
@@ -657,8 +656,8 @@ public class FlinkAppHttpWatcher {
   private Overview httpOverview(Application application) throws IOException {
     String appId = application.getAppId();
     if (appId != null) {
-      if 
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
-          || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
+      if (ExecutionMode.YARN_APPLICATION == application.getExecutionModeEnum()
+          || ExecutionMode.YARN_PER_JOB == application.getExecutionModeEnum()) 
{
         String reqURL;
         if (StringUtils.isBlank(application.getJobManagerUrl())) {
           String format = "proxy/%s/overview";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
index 5e30557c9..440170c42 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
@@ -82,7 +82,7 @@ public class FlinkCheckpointProcessor {
     CheckPointStatus status = checkPoint.getCheckPointStatus();
     CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, 
checkPoint.getId());
 
-    if (CheckPointStatus.COMPLETED.equals(status)) {
+    if (CheckPointStatus.COMPLETED == status) {
       if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
         savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
         saveSavepoint(checkPoint, application.getId());
@@ -161,9 +161,7 @@ public class FlinkCheckpointProcessor {
 
   private boolean shouldProcessFailedTrigger(
       CheckPoints.CheckPoint checkPoint, boolean cpFailedTrigger, 
CheckPointStatus status) {
-    return CheckPointStatus.FAILED.equals(status)
-        && !checkPoint.getIsSavepoint()
-        && cpFailedTrigger;
+    return CheckPointStatus.FAILED == status && !checkPoint.getIsSavepoint() 
&& cpFailedTrigger;
   }
 
   private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 45560c35f..c097dfb5b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -105,10 +105,10 @@ public class FlinkK8sChangeEventListener {
 
     // email alerts when necessary
     FlinkAppState state = app.getStateEnum();
-    if (FlinkAppState.FAILED.equals(state)
-        || FlinkAppState.LOST.equals(state)
-        || FlinkAppState.RESTARTING.equals(state)
-        || FlinkAppState.FINISHED.equals(state)) {
+    if (FlinkAppState.FAILED == state
+        || FlinkAppState.LOST == state
+        || FlinkAppState.RESTARTING == state
+        || FlinkAppState.FINISHED == state) {
       executor.execute(
           () -> {
             if (app.getProbing()) {
@@ -131,7 +131,7 @@ public class FlinkK8sChangeEventListener {
     TrackId trackId = event.trackId();
     ExecutionMode mode = 
FlinkK8sExecuteMode.toExecutionMode(trackId.executeMode());
     // discard session mode change
-    if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(mode)) {
+    if (ExecutionMode.KUBERNETES_NATIVE_SESSION == mode) {
       return;
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 2d5a1a339..6c82fe142 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -127,14 +127,14 @@ public class FlinkK8sWatcherWrapper {
     // covert Application to TrackId
     public static TrackId toTrackId(@Nonnull Application app) {
       Enumeration.Value mode = 
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
-      if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
+      if (FlinkK8sExecuteMode.APPLICATION() == mode) {
         return TrackId.onApplication(
             app.getK8sNamespace(),
             app.getClusterId(),
             app.getId(),
             app.getJobId(),
             app.getTeamId().toString());
-      } else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
+      } else if (FlinkK8sExecuteMode.SESSION() == mode) {
         return TrackId.onSession(
             app.getK8sNamespace(),
             app.getClusterId(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
index fef5064d2..4ce0aaca9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
@@ -66,7 +66,7 @@ public class MenuServiceImpl extends ServiceImpl<MenuMapper, 
Menu> implements Me
                     new IllegalArgumentException(
                         String.format("The userId [%s] not found", userId)));
     // Admin has the permission for all menus.
-    if (UserType.ADMIN.equals(user.getUserType())) {
+    if (UserType.ADMIN == user.getUserType()) {
       return 
this.list().stream().map(Menu::getPerms).collect(Collectors.toList());
     }
     return this.baseMapper.findUserPermissions(userId, teamId);
@@ -81,7 +81,7 @@ public class MenuServiceImpl extends ServiceImpl<MenuMapper, 
Menu> implements Me
                     new IllegalArgumentException(
                         String.format("The userId:[%s] not found", userId)));
     // Admin has the permission for all menus.
-    if (UserType.ADMIN.equals(user.getUserType())) {
+    if (UserType.ADMIN == user.getUserType()) {
       LambdaQueryWrapper<Menu> queryWrapper =
           new LambdaQueryWrapper<Menu>().eq(Menu::getType, 
"0").orderByAsc(Menu::getOrderNum);
       return this.list(queryWrapper);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
index 9d49c5c1a..f89c3b398 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
@@ -143,7 +143,7 @@ public class TeamServiceImpl extends 
ServiceImpl<TeamMapper, Team> implements Te
             .orElseThrow(
                 () -> new ApiAlertException(String.format("The userId [%s] not 
found.", userId)));
     // Admin has the permission for all teams.
-    if (UserType.ADMIN.equals(user.getUserType())) {
+    if (UserType.ADMIN == user.getUserType()) {
       return this.list();
     }
     return baseMapper.findUserTeams(userId);
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index d3608e7cf..372251d77 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -212,7 +212,7 @@ object YarnSessionClient extends YarnClientTrait {
             clusterDescriptor.getYarnClient
               
.getApplicationReport(ApplicationId.fromString(deployRequest.clusterId))
               .getFinalApplicationStatus
-          if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
+          if (FinalApplicationStatus.UNDEFINED == applicationStatus) {
             // application is running
             val yarnClient = clusterDescriptor
               .retrieve(ApplicationId.fromString(deployRequest.clusterId))
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 71c4456d3..839520eba 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -514,8 +514,8 @@ trait FlinkClientTrait extends Logger {
     }
 
     if (
-      submitRequest.developmentMode == DevelopmentMode.PYFLINK && 
!submitRequest.executionMode
-        .equals(ExecutionMode.YARN_APPLICATION)
+      submitRequest.developmentMode == DevelopmentMode.PYFLINK
+      && submitRequest.executionMode != ExecutionMode.YARN_APPLICATION
     ) {
       // python file
       programArgs.add("-py")


Reply via email to