This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ac1bc0bae [improvement] improvement flink cluster
verifyClusterConnection (#2903)
ac1bc0bae is described below
commit ac1bc0bae26bf4ab302b1a71848494e9e5b70687
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Jul 29 15:08:50 2023 +0800
[improvement] improvement flink cluster verifyClusterConnection (#2903)
---
.../console/core/entity/FlinkCluster.java | 48 ----------------------
.../core/service/impl/ApplicationServiceImpl.java | 5 ++-
.../core/service/impl/FlinkClusterServiceImpl.java | 8 ++--
.../console/core/task/FlinkClusterWatcher.java | 46 ++++++++++++++-------
.../console/core/task/FlinkHttpWatcher.java | 4 +-
5 files changed, 41 insertions(+), 70 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 32feec747..9684c8406 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,10 +24,7 @@ import
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.commons.lang3.StringUtils;
@@ -156,51 +153,6 @@ public class FlinkCluster implements Serializable {
return null;
}
- /**
- * Verify the cluster connection whether is valid.
- *
- * @return <code>false</code> if the connection of the cluster is invalid,
<code>true</code> else.
- */
- public boolean verifyClusterConnection() {
- if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
- if (address == null) {
- return false;
- }
- // 1) check url is Legal
- if (!CommonUtils.isLegalUrl(address)) {
- return false;
- }
- // 2) check connection
- try {
- String restUrl = address + "/overview";
- String result =
- HttpClientUtils.httpGetRequest(
- restUrl,
- RequestConfig.custom().setConnectTimeout(2000,
TimeUnit.MILLISECONDS).build());
- JacksonUtils.read(result, Overview.class);
- return true;
- } catch (Exception ignored) {
- //
- }
- return false;
- }
- if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
- try {
- String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" +
this.clusterId + "/overview";
- String result =
- HttpClientUtils.httpGetRequest(
- restUrl,
- RequestConfig.custom().setConnectTimeout(2000,
TimeUnit.MILLISECONDS).build());
- JacksonUtils.read(result, Overview.class);
- return true;
- } catch (Exception ignored) {
- //
- }
- return false;
- }
- return false;
- }
-
@JsonIgnore
public Map<String, String> getFlinkConfig() throws JsonProcessingException {
String restUrl = this.address + "/jobmanager/config";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 886756770..7c9140dc1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,6 +78,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private YarnQueueService yarnQueueService;
+ @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -430,7 +433,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
|| ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster =
flinkClusterService.getById(application.getFlinkClusterId());
- boolean conned = flinkCluster.verifyClusterConnection();
+ boolean conned =
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
if (!conned) {
throw new ApiAlertException("the target cluster is unavailable,
please check!");
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f3dda6a56..adb74ce41 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -94,6 +94,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private YarnQueueService yarnQueueService;
+ @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
@@ -118,14 +120,14 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
// 3) Check connection
if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
- && !cluster.verifyClusterConnection()) {
+ && !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The remote cluster connection failed, please check!");
result.setStatus(3);
return result;
}
if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
&& cluster.getClusterId() != null
- && !cluster.verifyClusterConnection()) {
+ && !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The flink cluster connection failed, please check!");
result.setStatus(4);
return result;
@@ -411,7 +413,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfFalse(
ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
- if (!flinkCluster.verifyClusterConnection()) {
+ if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
flinkCluster.setClusterState(ClusterState.LOST.getValue());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please
check!");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 9b256742c..3762acd37 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -148,17 +148,7 @@ public class FlinkClusterWatcher {
if (state != null) {
return state;
}
- switch (flinkCluster.getExecutionModeEnum()) {
- case REMOTE:
- state = httpRemoteClusterState(flinkCluster);
- break;
- case YARN_SESSION:
- state = httpYarnSessionClusterState(flinkCluster);
- break;
- default:
- state = ClusterState.UNKNOWN;
- break;
- }
+ state = httpClusterState(flinkCluster);
if (ClusterState.isRunning(state)) {
FAILED_STATES.invalidate(flinkCluster.getId());
} else {
@@ -192,6 +182,23 @@ public class FlinkClusterWatcher {
return state;
}
+ /**
+ * get flink cluster state
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState httpClusterState(FlinkCluster flinkCluster) {
+ switch (flinkCluster.getExecutionModeEnum()) {
+ case REMOTE:
+ return httpRemoteClusterState(flinkCluster);
+ case YARN_SESSION:
+ return httpYarnSessionClusterState(flinkCluster);
+ default:
+ return ClusterState.UNKNOWN;
+ }
+ }
+
/**
* cluster get state from flink rest api
*
@@ -272,9 +279,18 @@ public class FlinkClusterWatcher {
* @return
*/
private ClusterState yarnStateConvertClusterState(YarnApplicationState
state) {
- if (state == YarnApplicationState.FINISHED) {
- return ClusterState.CANCELED;
- }
- return ClusterState.of(state.toString());
+ return state == YarnApplicationState.FINISHED
+ ? ClusterState.CANCELED
+ : ClusterState.of(state.toString());
+ }
+
+ /**
+ * Verify the cluster connection whether is valid.
+ *
+ * @return <code>false</code> if the connection of the cluster is invalid,
<code>true</code> else.
+ */
+ public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
+ ClusterState clusterState = httpClusterState(flinkCluster);
+ return ClusterState.isRunning(clusterState) ? true : false;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index f6ed5c96c..26302387a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -17,7 +17,6 @@
package org.apache.streampark.console.core.task;
-import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -784,8 +783,7 @@ public class FlinkHttpWatcher {
case YARN_SESSION:
case REMOTE:
FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
- ClusterState clusterState =
flinkClusterWatcher.getClusterState(flinkCluster);
- if (ClusterState.isRunning(clusterState)) {
+ if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
log.info(
"application with id {} is yarn session or remote and flink
cluster with id {} is alive, application send alert",
app.getId(),