This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4b9d8817d [ISSUE-3129][Improve] Unified enumeration comparison methods
(#3135)
4b9d8817d is described below
commit 4b9d8817d9a7044e38523c6acb74b54c609bc7d8
Author: xxt <[email protected]>
AuthorDate: Tue Sep 19 02:05:11 2023 +0800
[ISSUE-3129][Improve] Unified enumeration comparison methods (#3135)
* [ISSUE-3129][Improve] Unified enumeration comparison methods #3129
---------
Co-authored-by: xujiawei <[email protected]>
---
.../streampark/common/enums/ExecutionMode.java | 16 +++----
.../console/core/entity/Application.java | 2 +-
.../console/core/entity/FlinkCluster.java | 2 +-
.../console/core/enums/FlinkAppState.java | 2 +-
.../console/core/enums/GitCredential.java | 2 +-
.../console/core/runner/EnvInitializer.java | 2 +-
.../impl/ApplicationActionServiceImpl.java | 12 +++---
.../impl/ApplicationInfoServiceImpl.java | 6 +--
.../impl/ApplicationManageServiceImpl.java | 4 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 4 +-
.../service/impl/ApplicationConfigServiceImpl.java | 2 +-
.../core/service/impl/ProjectServiceImpl.java | 4 +-
.../core/service/impl/ResourceServiceImpl.java | 2 +-
.../core/service/impl/SavePointServiceImpl.java | 5 +--
.../console/core/task/FlinkAppHttpWatcher.java | 49 +++++++++++-----------
.../core/task/FlinkCheckpointProcessor.java | 6 +--
.../core/task/FlinkK8sChangeEventListener.java | 10 ++---
.../console/core/task/FlinkK8sWatcherWrapper.java | 4 +-
.../system/service/impl/MenuServiceImpl.java | 4 +-
.../system/service/impl/TeamServiceImpl.java | 2 +-
.../flink/client/impl/YarnSessionClient.scala | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 4 +-
22 files changed, 71 insertions(+), 75 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
index 9ce12ea28..889cb915d 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
@@ -74,17 +74,17 @@ public enum ExecutionMode {
}
public static boolean isYarnMode(ExecutionMode mode) {
- return YARN_PER_JOB.equals(mode) || YARN_APPLICATION.equals(mode) ||
YARN_SESSION.equals(mode);
+ return YARN_PER_JOB == mode || YARN_APPLICATION == mode || YARN_SESSION ==
mode;
}
// TODO: We'll inline this method back to the corresponding caller lines
// after dropping the yarn perjob mode.
public static boolean isYarnPerJobOrAppMode(ExecutionMode mode) {
- return YARN_PER_JOB.equals(mode) || YARN_APPLICATION.equals(mode);
+ return YARN_PER_JOB == mode || YARN_APPLICATION == mode;
}
public static boolean isYarnSessionMode(ExecutionMode mode) {
- return YARN_SESSION.equals(mode);
+ return YARN_SESSION == mode;
}
public static boolean isYarnMode(Integer value) {
@@ -92,11 +92,11 @@ public enum ExecutionMode {
}
public static boolean isKubernetesSessionMode(Integer value) {
- return KUBERNETES_NATIVE_SESSION.equals(of(value));
+ return KUBERNETES_NATIVE_SESSION == of(value);
}
public static boolean isKubernetesMode(ExecutionMode mode) {
- return KUBERNETES_NATIVE_SESSION.equals(mode) ||
KUBERNETES_NATIVE_APPLICATION.equals(mode);
+ return KUBERNETES_NATIVE_SESSION == mode || KUBERNETES_NATIVE_APPLICATION
== mode;
}
public static boolean isKubernetesMode(Integer value) {
@@ -104,7 +104,7 @@ public enum ExecutionMode {
}
public static boolean isKubernetesApplicationMode(Integer value) {
- return KUBERNETES_NATIVE_APPLICATION.equals(of(value));
+ return KUBERNETES_NATIVE_APPLICATION == of(value);
}
public static List<Integer> getKubernetesMode() {
@@ -113,7 +113,7 @@ public enum ExecutionMode {
}
public static boolean isSessionMode(ExecutionMode mode) {
- return KUBERNETES_NATIVE_SESSION.equals(mode) || YARN_SESSION.equals(mode);
+ return KUBERNETES_NATIVE_SESSION == mode || YARN_SESSION == mode;
}
public static boolean isRemoteMode(Integer value) {
@@ -121,6 +121,6 @@ public enum ExecutionMode {
}
public static boolean isRemoteMode(ExecutionMode mode) {
- return REMOTE.equals(mode);
+ return REMOTE == mode;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 98fd8dd53..0c1593662 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -569,7 +569,7 @@ public class Application implements Serializable {
}
private boolean needFillYarnQueueLabel(ExecutionMode mode) {
- return ExecutionMode.YARN_PER_JOB.equals(mode) ||
ExecutionMode.YARN_APPLICATION.equals(mode);
+ return ExecutionMode.YARN_PER_JOB == mode ||
ExecutionMode.YARN_APPLICATION == mode;
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 390607476..6243d6f55 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -136,7 +136,7 @@ public class FlinkCluster implements Serializable {
return Collections.emptyMap();
}
Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
- if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION == getExecutionModeEnum()) {
map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue));
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 59dc4c6f5..4875d5969 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -153,7 +153,7 @@ public enum FlinkAppState {
public static class Bridge {
/** covert from org.apache.streampark.flink.k8s.enums.FlinkJobState */
public static FlinkAppState fromK8sFlinkJobState(Enumeration.Value
flinkJobState) {
- if (FlinkJobState.K8S_INITIALIZING().equals(flinkJobState)) {
+ if (FlinkJobState.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
} else {
return of(flinkJobState.toString());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
index 36a19a90f..748077ff3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GitCredential.java
@@ -34,7 +34,7 @@ public enum GitCredential {
}
public static boolean isSSH(Integer gitCredential) {
- return GitCredential.SSH.equals(GitCredential.of(gitCredential));
+ return GitCredential.SSH == GitCredential.of(gitCredential);
}
public Integer getValue() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index f854eb6a4..c4cb68a27 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -136,7 +136,7 @@ public class EnvInitializer implements ApplicationRunner {
Workspace workspace = Workspace.of(storageType);
// 1. prepare workspace dir
- if (storageType.equals(LFS)) {
+ if (LFS == storageType) {
fsOperator.mkdirsIfNotExists(Workspace.APP_LOCAL_DIST());
}
Arrays.asList(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 728f3b7f4..c17a9fd51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -268,7 +268,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
clusterId = application.getClusterId();
} else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
@@ -438,7 +438,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
String appConf = userJarAndAppConf.f1;
BuildResult buildResult = buildPipeline.getBuildResult();
- if
(ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_APPLICATION == application.getExecutionModeEnum()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
@@ -591,7 +591,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
- if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+ if (ExecutionMode.YARN_APPLICATION == executionMode) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
@@ -624,7 +624,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
ConfigFileType fileType =
ConfigFileType.of(applicationConfig.getFormat());
- if (fileType != null &&
!fileType.equals(ConfigFileType.UNKNOWN)) {
+ if (fileType != null && ConfigFileType.UNKNOWN != fileType) {
appConf =
String.format(
"%s://%s", fileType.getTypeName(),
applicationConfig.getContent());
@@ -645,7 +645,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
}
- if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
+ if (ExecutionMode.YARN_APPLICATION == executionMode) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
flinkUserJar =
@@ -690,7 +690,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index ca7ad1e04..c4e1b27d3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -202,8 +202,8 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv);
envInitializer.storageInitialize(application.getStorageType());
- if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
- || ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()
+ || ExecutionMode.REMOTE == application.getExecutionModeEnum()) {
FlinkCluster flinkCluster =
flinkClusterService.getById(application.getFlinkClusterId());
boolean conned =
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
if (!conned) {
@@ -247,7 +247,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
.anyMatch(
application ->
clusterId.equals(application.getFlinkClusterId())
- &&
FlinkAppState.RUNNING.equals(application.getStateEnum()));
+ && FlinkAppState.RUNNING ==
application.getStateEnum());
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2dc38afb2..4a46462ff 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -497,8 +497,8 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
// 4) yarn application mode change
if (!application.getBuild()) {
if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
- if
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
- ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+ if (ExecutionMode.YARN_APPLICATION == appParam.getExecutionModeEnum()
+ || ExecutionMode.YARN_APPLICATION ==
application.getExecutionModeEnum()) {
application.setBuild(true);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 65d6650e2..4e0bfa483 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -443,8 +443,8 @@ public class AppBuildPipeServiceImpl
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
- if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
- && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
+ if (DevelopmentMode.CUSTOM_CODE == app.getDevelopmentMode()
+ && ApplicationType.APACHE_FLINK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 5b560fc29..56f778b1c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -74,7 +74,7 @@ public class ApplicationConfigServiceImpl
if (application.getFormat() != null) {
ConfigFileType fileType = ConfigFileType.of(application.getFormat());
- if (fileType == null || ConfigFileType.UNKNOWN.equals(fileType)) {
+ if (fileType == null || ConfigFileType.UNKNOWN == fileType) {
throw new ApiAlertException(
"application' config error. must be (.properties|.yaml|.yml
|.conf)");
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 254818045..416e162b8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -141,7 +141,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
}
if (projectParam.getBuildState() != null) {
project.setBuildState(projectParam.getBuildState());
- if
(BuildState.of(projectParam.getBuildState()).equals(BuildState.NEED_REBUILD)) {
+ if (BuildState.NEED_REBUILD ==
BuildState.of(projectParam.getBuildState())) {
List<Application> applications = getApplications(project);
// Update deployment status
applications.forEach(
@@ -235,7 +235,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
Project project = getById(id);
Utils.notNull(project);
- if (!BuildState.SUCCESSFUL.equals(BuildState.of(project.getBuildState()))
+ if (BuildState.SUCCESSFUL != BuildState.of(project.getBuildState())
|| !project.getDistHome().exists()) {
return Collections.emptyList();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 24ed747c8..c5e35c8b6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -304,7 +304,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
FlinkConnector connectorResource;
ApiAlertException.throwIfFalse(
- ResourceType.CONNECTOR.equals(resourceParam.getResourceType()),
+ ResourceType.CONNECTOR == resourceParam.getResourceType(),
"getConnectorId method error, resource not flink connector.");
List<File> jars;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 16d09a246..67d609fe3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -306,7 +306,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
} else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION == application.getExecutionModeEnum()) {
Utils.notNull(
cluster,
String.format(
@@ -445,8 +445,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
int cpThreshold =
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
.orElse(getChkNumRetainedFromFlinkEnv(flinkEnv, application));
- cpThreshold =
- CHECKPOINT.equals(CheckPointType.of(entity.getType())) ? cpThreshold -
1 : cpThreshold;
+ cpThreshold = CHECKPOINT == CheckPointType.of(entity.getType()) ?
cpThreshold - 1 : cpThreshold;
if (cpThreshold == 0) {
LambdaQueryWrapper<SavePoint> queryWrapper =
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index ce60a1919..4bfb75127 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -230,12 +230,12 @@ public class FlinkAppHttpWatcher {
Query from flink's restAPI and yarn's restAPI both failed.
In this case, it is necessary to decide whether to return to
the final state depending on the state being operated
*/
- if (optionState == null ||
!optionState.equals(OptionState.STARTING)) {
+ if (optionState == null || OptionState.STARTING != optionState) {
// non-mapping
if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
log.error(
"FlinkAppHttpWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
- if (StopFrom.NONE.equals(stopFrom)) {
+ if (StopFrom.NONE == stopFrom) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
doAlert(application, FlinkAppState.LOST);
@@ -253,9 +253,9 @@ public class FlinkAppHttpWatcher {
cleanOptioning(optionState, id);
doPersistMetrics(application, true);
FlinkAppState appState = application.getStateEnum();
- if (appState == FlinkAppState.FAILED || appState ==
FlinkAppState.LOST) {
+ if (FlinkAppState.FAILED == appState || FlinkAppState.LOST ==
appState) {
doAlert(application, application.getStateEnum());
- if (appState.equals(FlinkAppState.FAILED)) {
+ if (FlinkAppState.FAILED == appState) {
try {
applicationActionService.start(application, true);
} catch (Exception e) {
@@ -280,8 +280,7 @@ public class FlinkAppHttpWatcher {
JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
ExecutionMode execMode = application.getExecutionModeEnum();
- if (ExecutionMode.YARN_APPLICATION.equals(execMode)
- || ExecutionMode.YARN_PER_JOB.equals(execMode)) {
+ if (ExecutionMode.YARN_APPLICATION == execMode ||
ExecutionMode.YARN_PER_JOB == execMode) {
optional =
jobsOverview.getJobs().size() > 1
? jobsOverview.getJobs().stream()
@@ -299,7 +298,7 @@ public class FlinkAppHttpWatcher {
JobsOverview.Job jobOverview = optional.get();
FlinkAppState currentState = FlinkAppState.of(jobOverview.getState());
- if (!FlinkAppState.OTHER.equals(currentState)) {
+ if (FlinkAppState.OTHER != currentState) {
try {
// 1) set info from JobOverview
handleJobOverview(application, jobOverview);
@@ -314,7 +313,7 @@ public class FlinkAppHttpWatcher {
}
// 3) savePoint obsolete check and NEED_START check
OptionState optionState = OPTIONING.get(application.getId());
- if (currentState.equals(FlinkAppState.RUNNING)) {
+ if (FlinkAppState.RUNNING == currentState) {
handleRunningState(application, optionState, currentState);
} else {
handleNotRunState(application, optionState, currentState, stopFrom);
@@ -386,7 +385,7 @@ public class FlinkAppHttpWatcher {
NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
*/
- if (OptionState.STARTING.equals(optionState)) {
+ if (OptionState.STARTING == optionState) {
Application latestApp = WATCHING_APPS.get(application.getId());
ReleaseState releaseState = latestApp.getReleaseState();
switch (releaseState) {
@@ -460,8 +459,8 @@ public class FlinkAppHttpWatcher {
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
- if (StopFrom.NONE.equals(stopFrom) ||
applicationInfoService.checkAlter(application)) {
- if (StopFrom.NONE.equals(stopFrom)) {
+ if (StopFrom.NONE == stopFrom ||
applicationInfoService.checkAlter(application)) {
+ if (StopFrom.NONE == stopFrom) {
log.info(
"FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not
form StreamPark,savePoint expired!");
savePointService.expire(application.getId());
@@ -512,7 +511,7 @@ public class FlinkAppHttpWatcher {
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
log.info("FlinkAppHttpWatcher previous state: canceling.");
- if (StopFrom.NONE.equals(stopFrom)) {
+ if (StopFrom.NONE == stopFrom) {
log.error(
"FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
@@ -525,18 +524,18 @@ public class FlinkAppHttpWatcher {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
- if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
failed ");
}
} else {
try {
String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppState flinkAppState = FlinkAppState.of(state);
- if (FlinkAppState.OTHER.equals(flinkAppState)) {
+ if (FlinkAppState.OTHER == flinkAppState) {
return;
}
- if (FlinkAppState.KILLED.equals(flinkAppState)) {
- if (StopFrom.NONE.equals(stopFrom)) {
+ if (FlinkAppState.KILLED == flinkAppState) {
+ if (StopFrom.NONE == stopFrom) {
log.error(
"FlinkAppHttpWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
@@ -545,24 +544,24 @@ public class FlinkAppHttpWatcher {
cleanSavepoint(application);
application.setEndTime(new Date());
}
- if (FlinkAppState.SUCCEEDED.equals(flinkAppState)) {
+ if (FlinkAppState.SUCCEEDED == flinkAppState) {
flinkAppState = FlinkAppState.FINISHED;
}
application.setState(flinkAppState.getValue());
cleanOptioning(optionState, application.getId());
doPersistMetrics(application, true);
- if (flinkAppState.equals(FlinkAppState.FAILED)
- || flinkAppState.equals(FlinkAppState.LOST)
- || (flinkAppState.equals(FlinkAppState.CANCELED) &&
StopFrom.NONE.equals(stopFrom))
+ if (FlinkAppState.FAILED == flinkAppState
+ || FlinkAppState.LOST == flinkAppState
+ || (FlinkAppState.CANCELED == flinkAppState && StopFrom.NONE ==
stopFrom)
|| applicationInfoService.checkAlter(application)) {
doAlert(application, flinkAppState);
stopCanceledJob(application.getId());
- if (flinkAppState.equals(FlinkAppState.FAILED)) {
+ if (FlinkAppState.FAILED == flinkAppState) {
applicationActionService.start(application, true);
}
}
} catch (Exception e) {
- if
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+ if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
error,", e);
}
}
@@ -589,7 +588,7 @@ public class FlinkAppHttpWatcher {
}
log.info("FlinkAppHttpWatcher setOptioning");
OPTIONING.put(appId, state);
- if (state.equals(OptionState.CANCELLING)) {
+ if (OptionState.CANCELLING == state) {
STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
}
}
@@ -657,8 +656,8 @@ public class FlinkAppHttpWatcher {
private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
if (appId != null) {
- if
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
- ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
+ if (ExecutionMode.YARN_APPLICATION == application.getExecutionModeEnum()
+ || ExecutionMode.YARN_PER_JOB == application.getExecutionModeEnum())
{
String reqURL;
if (StringUtils.isBlank(application.getJobManagerUrl())) {
String format = "proxy/%s/overview";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
index 5e30557c9..440170c42 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
@@ -82,7 +82,7 @@ public class FlinkCheckpointProcessor {
CheckPointStatus status = checkPoint.getCheckPointStatus();
CheckPointKey checkPointKey = new CheckPointKey(appId, jobID,
checkPoint.getId());
- if (CheckPointStatus.COMPLETED.equals(status)) {
+ if (CheckPointStatus.COMPLETED == status) {
if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
@@ -161,9 +161,7 @@ public class FlinkCheckpointProcessor {
private boolean shouldProcessFailedTrigger(
CheckPoints.CheckPoint checkPoint, boolean cpFailedTrigger,
CheckPointStatus status) {
- return CheckPointStatus.FAILED.equals(status)
- && !checkPoint.getIsSavepoint()
- && cpFailedTrigger;
+ return CheckPointStatus.FAILED == status && !checkPoint.getIsSavepoint()
&& cpFailedTrigger;
}
private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 45560c35f..c097dfb5b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -105,10 +105,10 @@ public class FlinkK8sChangeEventListener {
// email alerts when necessary
FlinkAppState state = app.getStateEnum();
- if (FlinkAppState.FAILED.equals(state)
- || FlinkAppState.LOST.equals(state)
- || FlinkAppState.RESTARTING.equals(state)
- || FlinkAppState.FINISHED.equals(state)) {
+ if (FlinkAppState.FAILED == state
+ || FlinkAppState.LOST == state
+ || FlinkAppState.RESTARTING == state
+ || FlinkAppState.FINISHED == state) {
executor.execute(
() -> {
if (app.getProbing()) {
@@ -131,7 +131,7 @@ public class FlinkK8sChangeEventListener {
TrackId trackId = event.trackId();
ExecutionMode mode =
FlinkK8sExecuteMode.toExecutionMode(trackId.executeMode());
// discard session mode change
- if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(mode)) {
+ if (ExecutionMode.KUBERNETES_NATIVE_SESSION == mode) {
return;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 2d5a1a339..6c82fe142 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -127,14 +127,14 @@ public class FlinkK8sWatcherWrapper {
// covert Application to TrackId
public static TrackId toTrackId(@Nonnull Application app) {
Enumeration.Value mode =
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
- if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
+ if (FlinkK8sExecuteMode.APPLICATION() == mode) {
return TrackId.onApplication(
app.getK8sNamespace(),
app.getClusterId(),
app.getId(),
app.getJobId(),
app.getTeamId().toString());
- } else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
+ } else if (FlinkK8sExecuteMode.SESSION() == mode) {
return TrackId.onSession(
app.getK8sNamespace(),
app.getClusterId(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
index fef5064d2..4ce0aaca9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
@@ -66,7 +66,7 @@ public class MenuServiceImpl extends ServiceImpl<MenuMapper,
Menu> implements Me
new IllegalArgumentException(
String.format("The userId [%s] not found", userId)));
// Admin has the permission for all menus.
- if (UserType.ADMIN.equals(user.getUserType())) {
+ if (UserType.ADMIN == user.getUserType()) {
return
this.list().stream().map(Menu::getPerms).collect(Collectors.toList());
}
return this.baseMapper.findUserPermissions(userId, teamId);
@@ -81,7 +81,7 @@ public class MenuServiceImpl extends ServiceImpl<MenuMapper,
Menu> implements Me
new IllegalArgumentException(
String.format("The userId:[%s] not found", userId)));
// Admin has the permission for all menus.
- if (UserType.ADMIN.equals(user.getUserType())) {
+ if (UserType.ADMIN == user.getUserType()) {
LambdaQueryWrapper<Menu> queryWrapper =
new LambdaQueryWrapper<Menu>().eq(Menu::getType,
"0").orderByAsc(Menu::getOrderNum);
return this.list(queryWrapper);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
index 9d49c5c1a..f89c3b398 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
@@ -143,7 +143,7 @@ public class TeamServiceImpl extends
ServiceImpl<TeamMapper, Team> implements Te
.orElseThrow(
() -> new ApiAlertException(String.format("The userId [%s] not
found.", userId)));
// Admin has the permission for all teams.
- if (UserType.ADMIN.equals(user.getUserType())) {
+ if (UserType.ADMIN == user.getUserType()) {
return this.list();
}
return baseMapper.findUserTeams(userId);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index d3608e7cf..372251d77 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -212,7 +212,7 @@ object YarnSessionClient extends YarnClientTrait {
clusterDescriptor.getYarnClient
.getApplicationReport(ApplicationId.fromString(deployRequest.clusterId))
.getFinalApplicationStatus
- if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
+ if (FinalApplicationStatus.UNDEFINED == applicationStatus) {
// application is running
val yarnClient = clusterDescriptor
.retrieve(ApplicationId.fromString(deployRequest.clusterId))
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 71c4456d3..839520eba 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -514,8 +514,8 @@ trait FlinkClientTrait extends Logger {
}
if (
- submitRequest.developmentMode == DevelopmentMode.PYFLINK &&
!submitRequest.executionMode
- .equals(ExecutionMode.YARN_APPLICATION)
+ submitRequest.developmentMode == DevelopmentMode.PYFLINK
+ && submitRequest.executionMode != ExecutionMode.YARN_APPLICATION
) {
// python file
programArgs.add("-py")