This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a751c6960d89a065c8b652cd886d9bf3cb9510f2 Author: benjobs <[email protected]> AuthorDate: Sat Jul 15 21:33:49 2023 +0800 flink cluster improvement --- .../streampark/common/enums/ClusterState.java | 8 ---- .../streampark/common/util/HadoopUtils.scala | 6 ++- .../console/core/service/FlinkClusterService.java | 4 -- .../core/service/impl/FlinkClusterServiceImpl.java | 40 ++++++++++--------- .../console/core/task/FlinkClusterWatcher.java | 46 +++++++++------------- 5 files changed, 44 insertions(+), 60 deletions(-) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java index 7794f7f95..2aef409c1 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java @@ -67,12 +67,4 @@ public enum ClusterState implements Serializable { public static boolean isRunning(ClusterState state) { return RUNNING.equals(state); } - - public static boolean isFailed(ClusterState state) { - return state == ClusterState.FAILED - || state == ClusterState.LOST - || state == ClusterState.UNKNOWN - || state == ClusterState.KILLED - || state == ClusterState.CANCELED; - } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index 7616952b8..b9823d8f0 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.service.Service.STATE -import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -290,6 +290,10 @@ object HadoopUtils extends Logger { new File(destPath.toString).getAbsolutePath } + def toYarnState(state: String): YarnApplicationState = { + YarnApplicationState.values.find(_.name() == state).orNull + } + private class HadoopConfiguration extends Configuration { private lazy val rewriteNames = List( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index 1c62806db..5f47c60a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -37,12 +37,8 @@ public interface FlinkClusterService extends IService<FlinkCluster> { void update(FlinkCluster flinkCluster); - void starting(FlinkCluster flinkCluster); - void start(FlinkCluster flinkCluster); - void canceling(FlinkCluster flinkCluster); - void shutdown(FlinkCluster flinkCluster); Boolean existsByClusterId(String clusterId, Long id); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index c5eba5b3f..f3dda6a56 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -155,18 +155,11 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli return ret; } - @Override - public void starting(FlinkCluster flinkCluster) { - flinkCluster.setClusterState(ClusterState.STARTING.getValue()); - flinkCluster.setStartTime(new Date()); - updateById(flinkCluster); - } - @Override @Transactional(rollbackFor = {Exception.class}) public void start(FlinkCluster cluster) { FlinkCluster flinkCluster = getById(cluster.getId()); - starting(flinkCluster); + updateClusterState(cluster.getId(), ClusterState.STARTING); try { DeployResponse deployResponse = deployInternal(flinkCluster); ApiAlertException.throwIfNull( @@ -185,8 +178,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli flinkCluster.setClusterState(ClusterState.RUNNING.getValue()); flinkCluster.setException(null); flinkCluster.setEndTime(null); - FlinkClusterWatcher.addWatching(flinkCluster); updateById(flinkCluster); + FlinkClusterWatcher.addWatching(flinkCluster); } catch (Exception e) { log.error(e.getMessage(), e); flinkCluster.setClusterState(ClusterState.FAILED.getValue()); @@ -228,12 +221,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli updateById(flinkCluster); } - @Override - public void canceling(FlinkCluster flinkCluster) { - flinkCluster.setClusterState(ClusterState.CANCELING.getValue()); - updateById(flinkCluster); - } - @Override public void shutdown(FlinkCluster cluster) { FlinkCluster flinkCluster = this.getById(cluster.getId()); @@ -250,15 +237,15 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli ApiAlertException.throwIfTrue( existsRunningJob, "Some app is running on this cluster, the cluster cannot be shutdown"); - canceling(flinkCluster); + updateClusterState(flinkCluster.getId(), ClusterState.CANCELING); try { // 4) shutdown ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, clusterId); ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response failed"); flinkCluster.setClusterState(ClusterState.CANCELED.getValue()); flinkCluster.setEndTime(new Date()); - FlinkClusterWatcher.unWatching(flinkCluster); updateById(flinkCluster); + FlinkClusterWatcher.unWatching(flinkCluster); } catch (Exception e) { log.error(e.getMessage(), e); flinkCluster.setException(e.toString()); @@ -302,8 +289,23 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli LambdaUpdateWrapper<FlinkCluster> updateWrapper = new LambdaUpdateWrapper<FlinkCluster>() .eq(FlinkCluster::getId, id) - .set(FlinkCluster::getClusterState, state.getValue()) - .set(FlinkCluster::getEndTime, new Date()); + .set(FlinkCluster::getClusterState, state.getValue()); + + switch (state) { + case KILLED: + case UNKNOWN: + case LOST: + case FAILED: + case CANCELED: + updateWrapper.set(FlinkCluster::getEndTime, new Date()); + break; + case STARTING: + updateWrapper.set(FlinkCluster::getStartTime, new Date()); + break; + default: + break; + } + update(updateWrapper); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java index b6351fd7f..9b256742c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java @@ -19,6 +19,7 @@ package org.apache.streampark.console.core.task; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.YarnUtils; @@ -111,10 +112,17 @@ public class FlinkClusterWatcher { EXECUTOR.execute( () -> { ClusterState state = getClusterState(flinkCluster); - if (ClusterState.isFailed(state)) { - flinkClusterService.updateClusterState(flinkCluster.getId(), state); - unWatching(flinkCluster); - alert(flinkCluster, state); + switch (state) { + case FAILED: + case LOST: + case UNKNOWN: + case KILLED: + flinkClusterService.updateClusterState(flinkCluster.getId(), state); + unWatching(flinkCluster); + alert(flinkCluster, state); + break; + default: + break; } })); } @@ -177,8 +185,8 @@ public class FlinkClusterWatcher { * @return */ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) { - final ClusterState state = getStateFromFlinkRestApi(flinkCluster); - if (ClusterState.isFailed(state)) { + ClusterState state = getStateFromFlinkRestApi(flinkCluster); + if (ClusterState.LOST == state) { return getStateFromYarnRestApi(flinkCluster); } return state; @@ -191,12 +199,9 @@ public class FlinkClusterWatcher { * @return */ private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) { - final String address = flinkCluster.getAddress(); - if (StringUtils.isEmpty(address)) { - return ClusterState.CANCELED; - } - final String jobManagerUrl = flinkCluster.getJobManagerUrl(); - final String flinkUrl = + String address = flinkCluster.getAddress(); + String jobManagerUrl = flinkCluster.getJobManagerUrl(); + String flinkUrl = StringUtils.isEmpty(jobManagerUrl) ? address.concat("/overview") : jobManagerUrl.concat("/overview"); @@ -227,7 +232,7 @@ public class FlinkClusterWatcher { return ClusterState.UNKNOWN; } YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class); - YarnApplicationState status = stringConvertYarnState(yarnAppInfo.getApp().getState()); + YarnApplicationState status = HadoopUtils.toYarnState(yarnAppInfo.getApp().getState()); if (status == null) { log.error( "cluster id:{} final application status convert failed, invalid string ", @@ -260,21 +265,6 @@ public class FlinkClusterWatcher { } } - /** - * string converse yarn application state - * - * @param value - * @return - */ - private YarnApplicationState stringConvertYarnState(String value) { - for (YarnApplicationState state : YarnApplicationState.values()) { - if (state.name().equals(value)) { - return state; - } - } - return null; - } - /** * yarn application state convert cluster state *
