This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 39dfd17ef [Improve] Improve streampark-console module base on 3.6
Control/Condition Statements (#3328)
39dfd17ef is described below
commit 39dfd17ef76ae1d7a52ba689515a242a512c4935
Author: eagleli123 <[email protected]>
AuthorDate: Sat Nov 25 21:38:51 2023 +0800
[Improve] Improve streampark-console module base on 3.6 Control/Condition
Statements (#3328)
---
.../interceptor/PostgreSQLQueryInterceptor.java | 13 ++---
.../streampark/console/base/util/ObjectUtils.java | 6 +-
.../core/controller/ApplicationController.java | 3 +-
.../controller/FlinkPodTemplateController.java | 19 +++----
.../core/controller/FlinkSqlController.java | 3 +-
.../console/core/entity/Application.java | 8 +--
.../console/core/enums/FlinkAppStateEnum.java | 3 +-
.../console/core/metrics/flink/CheckPoints.java | 3 +-
.../impl/ApplicationInfoServiceImpl.java | 4 +-
.../impl/ApplicationManageServiceImpl.java | 66 +++++++++++-----------
.../core/service/impl/AppBuildPipeServiceImpl.java | 6 +-
.../core/service/impl/FlinkEnvServiceImpl.java | 5 +-
.../core/service/impl/ProjectServiceImpl.java | 3 +-
.../core/service/impl/SavePointServiceImpl.java | 6 +-
.../core/service/impl/SqlCompleteServiceImpl.java | 3 +-
.../console/core/watcher/FlinkAppHttpWatcher.java | 30 +++++-----
16 files changed, 81 insertions(+), 100 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java
index 80a8ce678..4933868ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/mybatis/interceptor/PostgreSQLQueryInterceptor.java
@@ -62,15 +62,10 @@ public class PostgreSQLQueryInterceptor implements
Interceptor {
RowBounds rowBounds = (RowBounds) args[2];
ResultHandler<?> resultHandler = (ResultHandler<?>) args[3];
Executor executor = (Executor) invocation.getTarget();
- CacheKey cacheKey;
- BoundSql boundSql;
- if (args.length == 4) {
- boundSql = ms.getBoundSql(parameter);
- cacheKey = executor.createCacheKey(ms, parameter, rowBounds, boundSql);
- } else {
- cacheKey = (CacheKey) args[4];
- boundSql = (BoundSql) args[5];
- }
+ boolean fourLen = args.length == 4;
+ BoundSql boundSql = fourLen ? ms.getBoundSql(parameter) : (BoundSql)
args[5];
+ CacheKey cacheKey =
+ fourLen ? executor.createCacheKey(ms, parameter, rowBounds, boundSql)
: (CacheKey) args[4];
return executor.query(ms, parameter, rowBounds, resultHandler, cacheKey,
boundSql);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
index dc8a7b2a3..90b3f9723 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
@@ -25,10 +25,8 @@ public final class ObjectUtils {
public static boolean trimEquals(Object o1, Object o2) {
boolean equals = Objects.deepEquals(o1, o2);
- if (!equals) {
- if (o1 instanceof String && o2 instanceof String) {
- return o1.toString().trim().equals(o2.toString().trim());
- }
+ if (!equals && o1 instanceof String && o2 instanceof String) {
+ return o1.toString().trim().equals(o2.toString().trim());
}
return equals;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 788992e90..f3c6f1fe3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -405,9 +405,8 @@ public class ApplicationController {
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
- } else {
- return RestResponse.success(false).message(error);
}
+ return RestResponse.success(false).message(error);
}
@Operation(summary = "Get application on k8s deploy logs")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java
index abf2385a0..9b7805ac8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkPodTemplateController.java
@@ -74,18 +74,15 @@ public class FlinkPodTemplateController {
private Map<String, String> covertHostsParamToMap(String hosts) {
if (StringUtils.isBlank(hosts)) {
return new HashMap<>(0);
- } else {
- return Arrays.stream(hosts.split(","))
- .filter(StringUtils::isNotBlank)
- .map(String::trim)
- .map(e -> e.split(":"))
- .filter(
- arr ->
- arr.length == 2
- && StringUtils.isNotBlank(arr[0])
- && StringUtils.isNotBlank(arr[1]))
- .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}
+ return Arrays.stream(hosts.split(","))
+ .filter(StringUtils::isNotBlank)
+ .map(String::trim)
+ .map(e -> e.split(":"))
+ .filter(
+ arr ->
+ arr.length == 2 && StringUtils.isNotBlank(arr[0]) &&
StringUtils.isNotBlank(arr[1]))
+ .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
}
@Operation(summary = "Extract host-alias from pod template")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index 2de639c51..73e3db332 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -82,9 +82,8 @@ public class FlinkSqlController {
.put(END, flinkSqlValidationResult.errorLine() + 1);
}
return response;
- } else {
- return RestResponse.success(true);
}
+ return RestResponse.success(true);
}
@Operation(summary = "List the application sql")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 09deab73d..ba8bc259b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -363,10 +363,10 @@ public class Application implements Serializable {
}
public boolean eqFlinkJob(Application other) {
- if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
- if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
- return this.getDependencyObject().equals(other.getDependencyObject());
- }
+ if (this.isFlinkSqlJob()
+ && other.isFlinkSqlJob()
+ && this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
+ return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
index 77963c321..151484f75 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppStateEnum.java
@@ -155,9 +155,8 @@ public enum FlinkAppStateEnum {
public static FlinkAppStateEnum fromK8sFlinkJobState(Enumeration.Value
flinkJobState) {
if (FlinkJobStateEnum.K8S_INITIALIZING() == flinkJobState) {
return INITIALIZING;
- } else {
- return of(flinkJobState.toString());
}
+ return of(flinkJobState.toString());
}
/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
index 812d84ea3..56f75186b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java
@@ -80,7 +80,8 @@ public class CheckPoints implements Serializable {
public CheckPointTypeEnum getCheckPointType() {
if ("CHECKPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.CHECKPOINT;
- } else if ("SAVEPOINT".equals(this.checkpointType)) {
+ }
+ if ("SAVEPOINT".equals(this.checkpointType)) {
return CheckPointTypeEnum.SAVEPOINT;
}
return CheckPointTypeEnum.SYNC_SAVEPOINT;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index b8f8b8a4b..176da5fd7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -403,7 +403,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
- else if
(FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
+ if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&&
k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsStateEnum.IN_KUBERNETES;
}
@@ -419,7 +419,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
return AppExistsStateEnum.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
- else if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
+ if (FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam)))
{
return AppExistsStateEnum.IN_KUBERNETES;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 8c5bce437..247ff807a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -239,16 +239,15 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
Page<Application> page = new
MybatisPager<Application>().getDefaultPage(request);
- if (ArrayUtils.isNotEmpty(appParam.getStateArray())) {
- if (Arrays.stream(appParam.getStateArray())
- .anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
- Integer[] newArray =
- ArrayUtils.insert(
- appParam.getStateArray().length,
- appParam.getStateArray(),
- FlinkAppStateEnum.POS_TERMINATED.getValue());
- appParam.setStateArray(newArray);
- }
+ if (ArrayUtils.isNotEmpty(appParam.getStateArray())
+ && Arrays.stream(appParam.getStateArray())
+ .anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
+ Integer[] newArray =
+ ArrayUtils.insert(
+ appParam.getStateArray().length,
+ appParam.getStateArray(),
+ FlinkAppStateEnum.POS_TERMINATED.getValue());
+ appParam.setStateArray(newArray);
}
this.baseMapper.selectPage(page, appParam);
List<Application> records = page.getRecords();
@@ -492,21 +491,8 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
// 2) k8s podTemplate changed.
- if (application.getBuild()
- && FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
- if (ObjectUtils.trimNoEquals(
- application.getK8sRestExposedType(),
appParam.getK8sRestExposedType())
- || ObjectUtils.trimNoEquals(
- application.getK8sJmPodTemplate(),
appParam.getK8sJmPodTemplate())
- || ObjectUtils.trimNoEquals(
- application.getK8sTmPodTemplate(),
appParam.getK8sTmPodTemplate())
- || ObjectUtils.trimNoEquals(
- application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
- || ObjectUtils.trimNoEquals(
- application.getK8sHadoopIntegration(),
appParam.getK8sHadoopIntegration())
- || ObjectUtils.trimNoEquals(application.getFlinkImage(),
appParam.getFlinkImage())) {
- application.setBuild(true);
- }
+ if (application.getBuild() && isK8sPodTemplateChanged(application,
appParam)) {
+ application.setBuild(true);
}
// 3) flink version changed
@@ -516,13 +502,8 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
// 4) yarn application mode change
- if (!application.getBuild()) {
- if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
- if (FlinkExecutionMode.YARN_APPLICATION ==
appParam.getFlinkExecutionMode()
- || FlinkExecutionMode.YARN_APPLICATION ==
application.getFlinkExecutionMode()) {
- application.setBuild(true);
- }
- }
+ if (!application.getBuild() && isYarnApplicationModeChange(application,
appParam)) {
+ application.setBuild(true);
}
appParam.setJobType(application.getJobType());
@@ -841,4 +822,25 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
return
FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
+
+ private boolean isK8sPodTemplateChanged(Application application, Application
appParam) {
+ return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
+ && (ObjectUtils.trimNoEquals(
+ application.getK8sRestExposedType(),
appParam.getK8sRestExposedType())
+ || ObjectUtils.trimNoEquals(
+ application.getK8sJmPodTemplate(),
appParam.getK8sJmPodTemplate())
+ || ObjectUtils.trimNoEquals(
+ application.getK8sTmPodTemplate(),
appParam.getK8sTmPodTemplate())
+ || ObjectUtils.trimNoEquals(
+ application.getK8sPodTemplates(),
appParam.getK8sPodTemplates())
+ || ObjectUtils.trimNoEquals(
+ application.getK8sHadoopIntegration(),
appParam.getK8sHadoopIntegration())
+ || ObjectUtils.trimNoEquals(application.getFlinkImage(),
appParam.getFlinkImage()));
+ }
+
+ private boolean isYarnApplicationModeChange(Application application,
Application appParam) {
+ return !application.getExecutionMode().equals(appParam.getExecutionMode())
+ && (FlinkExecutionMode.YARN_APPLICATION ==
appParam.getFlinkExecutionMode()
+ || FlinkExecutionMode.YARN_APPLICATION ==
application.getFlinkExecutionMode());
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index c04829104..db827a626 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -517,9 +517,8 @@ public class AppBuildPipeServiceImpl
log.info("Submit params to building pipeline : {}",
k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return
FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
- } else {
- return
FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
+ return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " +
app.getFlinkExecutionMode());
@@ -602,9 +601,8 @@ public class AppBuildPipeServiceImpl
AppBuildPipeline old = getById(pipe.getAppId());
if (old == null) {
return save(pipe);
- } else {
- return updateById(pipe);
}
+ return updateById(pipe);
}
private void checkOrElseUploadJar(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 1883c36a5..988917d56 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -133,10 +133,7 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
@Override
public FlinkEnv getByIdOrDefault(Long id) {
FlinkEnv flinkEnv = getById(id);
- if (flinkEnv == null) {
- return getDefault();
- }
- return flinkEnv;
+ return flinkEnv == null ? getDefault() : flinkEnv;
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index f434ef946..e6b28abac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -109,9 +109,8 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
if (status) {
return response.message("Add project successfully").data(true);
- } else {
- return response.message("Add project failed").data(false);
}
+ return response.message("Add project failed").data(false);
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 608ab0280..60afb07c1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -307,7 +307,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
private String getClusterId(Application application, FlinkCluster cluster) {
if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
- } else if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
+ }
+ if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION ==
application.getFlinkExecutionMode()) {
Utils.notNull(
cluster,
@@ -315,9 +316,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
"The yarn session clusterId=%s cannot be find, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
return cluster.getClusterId();
- } else {
- return application.getAppId();
}
+ return application.getAppId();
}
return null;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index d12a2c856..b98bdd7f8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -82,9 +82,8 @@ public class SqlCompleteServiceImpl implements
SqlCompleteService {
// This step is very critical. If the same name is not judged and the
count is different,
// then the set collection will default to the same element, and it
will be overwritten.
return this.word.compareTo(other.word) * -1;
- } else {
- return num * -1;
}
+ return num * -1;
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 52e9de29a..033631e6c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -436,10 +436,9 @@ public class FlinkAppHttpWatcher {
if (application.getStartTime() == null || startTime !=
application.getStartTime().getTime()) {
application.setStartTime(new Date(startTime));
}
- if (endTime != -1) {
- if (application.getEndTime() == null || endTime !=
application.getEndTime().getTime()) {
- application.setEndTime(new Date(endTime));
- }
+ if (endTime != -1
+ && (application.getEndTime() == null || endTime !=
application.getEndTime().getTime())) {
+ application.setEndTime(new Date(endTime));
}
application.setJobId(jobOverview.getId());
@@ -676,19 +675,18 @@ public class FlinkAppHttpWatcher {
private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
- if (appId != null) {
- if (FlinkExecutionMode.YARN_APPLICATION ==
application.getFlinkExecutionMode()
- || FlinkExecutionMode.YARN_PER_JOB ==
application.getFlinkExecutionMode()) {
- String reqURL;
- if (StringUtils.isBlank(application.getJobManagerUrl())) {
- String format = "proxy/%s/overview";
- reqURL = String.format(format, appId);
- } else {
- String format = "%s/overview";
- reqURL = String.format(format, application.getJobManagerUrl());
- }
- return yarnRestRequest(reqURL, Overview.class);
+ if (appId != null
+ && (FlinkExecutionMode.YARN_APPLICATION ==
application.getFlinkExecutionMode()
+ || FlinkExecutionMode.YARN_PER_JOB ==
application.getFlinkExecutionMode())) {
+ String reqURL;
+ if (StringUtils.isBlank(application.getJobManagerUrl())) {
+ String format = "proxy/%s/overview";
+ reqURL = String.format(format, appId);
+ } else {
+ String format = "%s/overview";
+ reqURL = String.format(format, application.getJobManagerUrl());
}
+ return yarnRestRequest(reqURL, Overview.class);
}
return null;
}