This is an automated email from the ASF dual-hosted git repository.
muchunjin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new fe3f05c40 flink cluster improvement (#2859)
fe3f05c40 is described below
commit fe3f05c40a19bb1a4b1d340b4a5ed76089b60c74
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 16 16:20:40 2023 +0800
flink cluster improvement (#2859)
---
.../streampark/common/enums/ClusterState.java | 8 ----
.../streampark/common/util/HadoopUtils.scala | 6 ++-
.../console/core/service/FlinkClusterService.java | 4 --
.../core/service/impl/FlinkClusterServiceImpl.java | 40 ++++++++++---------
.../console/core/task/FlinkClusterWatcher.java | 46 +++++++++-------------
5 files changed, 44 insertions(+), 60 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 7794f7f95..2aef409c1 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -67,12 +67,4 @@ public enum ClusterState implements Serializable {
public static boolean isRunning(ClusterState state) {
return RUNNING.equals(state);
}
-
- public static boolean isFailed(ClusterState state) {
- return state == ClusterState.FAILED
- || state == ClusterState.LOST
- || state == ClusterState.UNKNOWN
- || state == ClusterState.KILLED
- || state == ClusterState.CANCELED;
- }
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 7616952b8..b9823d8f0 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.service.Service.STATE
-import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -290,6 +290,10 @@ object HadoopUtils extends Logger {
new File(destPath.toString).getAbsolutePath
}
+ def toYarnState(state: String): YarnApplicationState = {
+ YarnApplicationState.values.find(_.name() == state).orNull
+ }
+
private class HadoopConfiguration extends Configuration {
private lazy val rewriteNames = List(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 1c62806db..5f47c60a5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -37,12 +37,8 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
void update(FlinkCluster flinkCluster);
- void starting(FlinkCluster flinkCluster);
-
void start(FlinkCluster flinkCluster);
- void canceling(FlinkCluster flinkCluster);
-
void shutdown(FlinkCluster flinkCluster);
Boolean existsByClusterId(String clusterId, Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index c5eba5b3f..f3dda6a56 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -155,18 +155,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
return ret;
}
- @Override
- public void starting(FlinkCluster flinkCluster) {
- flinkCluster.setClusterState(ClusterState.STARTING.getValue());
- flinkCluster.setStartTime(new Date());
- updateById(flinkCluster);
- }
-
@Override
@Transactional(rollbackFor = {Exception.class})
public void start(FlinkCluster cluster) {
FlinkCluster flinkCluster = getById(cluster.getId());
- starting(flinkCluster);
+ updateClusterState(cluster.getId(), ClusterState.STARTING);
try {
DeployResponse deployResponse = deployInternal(flinkCluster);
ApiAlertException.throwIfNull(
@@ -185,8 +178,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
flinkCluster.setException(null);
flinkCluster.setEndTime(null);
- FlinkClusterWatcher.addWatching(flinkCluster);
updateById(flinkCluster);
+ FlinkClusterWatcher.addWatching(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setClusterState(ClusterState.FAILED.getValue());
@@ -228,12 +221,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
updateById(flinkCluster);
}
- @Override
- public void canceling(FlinkCluster flinkCluster) {
- flinkCluster.setClusterState(ClusterState.CANCELING.getValue());
- updateById(flinkCluster);
- }
-
@Override
public void shutdown(FlinkCluster cluster) {
FlinkCluster flinkCluster = this.getById(cluster.getId());
@@ -250,15 +237,15 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfTrue(
existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
- canceling(flinkCluster);
+ updateClusterState(flinkCluster.getId(), ClusterState.CANCELING);
try {
// 4) shutdown
ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster,
clusterId);
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
flinkCluster.setEndTime(new Date());
- FlinkClusterWatcher.unWatching(flinkCluster);
updateById(flinkCluster);
+ FlinkClusterWatcher.unWatching(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setException(e.toString());
@@ -302,8 +289,23 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
LambdaUpdateWrapper<FlinkCluster> updateWrapper =
new LambdaUpdateWrapper<FlinkCluster>()
.eq(FlinkCluster::getId, id)
- .set(FlinkCluster::getClusterState, state.getValue())
- .set(FlinkCluster::getEndTime, new Date());
+ .set(FlinkCluster::getClusterState, state.getValue());
+
+ switch (state) {
+ case KILLED:
+ case UNKNOWN:
+ case LOST:
+ case FAILED:
+ case CANCELED:
+ updateWrapper.set(FlinkCluster::getEndTime, new Date());
+ break;
+ case STARTING:
+ updateWrapper.set(FlinkCluster::getStartTime, new Date());
+ break;
+ default:
+ break;
+ }
+
update(updateWrapper);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index b6351fd7f..9b256742c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
@@ -111,10 +112,17 @@ public class FlinkClusterWatcher {
EXECUTOR.execute(
() -> {
ClusterState state = getClusterState(flinkCluster);
- if (ClusterState.isFailed(state)) {
-
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
- unWatching(flinkCluster);
- alert(flinkCluster, state);
+ switch (state) {
+ case FAILED:
+ case LOST:
+ case UNKNOWN:
+ case KILLED:
+
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
+ unWatching(flinkCluster);
+ alert(flinkCluster, state);
+ break;
+ default:
+ break;
}
}));
}
@@ -177,8 +185,8 @@ public class FlinkClusterWatcher {
* @return
*/
private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
- final ClusterState state = getStateFromFlinkRestApi(flinkCluster);
- if (ClusterState.isFailed(state)) {
+ ClusterState state = getStateFromFlinkRestApi(flinkCluster);
+ if (ClusterState.LOST == state) {
return getStateFromYarnRestApi(flinkCluster);
}
return state;
@@ -191,12 +199,9 @@ public class FlinkClusterWatcher {
* @return
*/
private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
- final String address = flinkCluster.getAddress();
- if (StringUtils.isEmpty(address)) {
- return ClusterState.CANCELED;
- }
- final String jobManagerUrl = flinkCluster.getJobManagerUrl();
- final String flinkUrl =
+ String address = flinkCluster.getAddress();
+ String jobManagerUrl = flinkCluster.getJobManagerUrl();
+ String flinkUrl =
StringUtils.isEmpty(jobManagerUrl)
? address.concat("/overview")
: jobManagerUrl.concat("/overview");
@@ -227,7 +232,7 @@ public class FlinkClusterWatcher {
return ClusterState.UNKNOWN;
}
YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
- YarnApplicationState status =
stringConvertYarnState(yarnAppInfo.getApp().getState());
+ YarnApplicationState status =
HadoopUtils.toYarnState(yarnAppInfo.getApp().getState());
if (status == null) {
log.error(
"cluster id:{} final application status convert failed, invalid
string ",
@@ -260,21 +265,6 @@ public class FlinkClusterWatcher {
}
}
- /**
- * string converse yarn application state
- *
- * @param value
- * @return
- */
- private YarnApplicationState stringConvertYarnState(String value) {
- for (YarnApplicationState state : YarnApplicationState.values()) {
- if (state.name().equals(value)) {
- return state;
- }
- }
- return null;
- }
-
/**
* yarn application state convert cluster state
*