This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d7166dcd6 [Improve] Application state improvement (#3128)
d7166dcd6 is described below
commit d7166dcd6e696cc8c8c339fdb8dcbc569682e641
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 17 00:25:27 2023 -0500
[Improve] Application state improvement (#3128)
---
.../console/core/annotation/AppUpdated.java | 5 +--
.../console/core/aspect/StreamParkAspect.java | 6 ++--
.../console/core/bean/AlertProbeMsg.java | 29 ++++++++-------
.../console/core/entity/Application.java | 24 ++++---------
.../impl/ApplicationActionServiceImpl.java | 18 +++++-----
.../impl/ApplicationInfoServiceImpl.java | 15 ++++----
.../impl/ApplicationManageServiceImpl.java | 6 ++--
.../core/service/impl/AppBuildPipeServiceImpl.java | 12 +++----
.../core/service/impl/ProjectServiceImpl.java | 8 ++---
.../core/service/impl/SavePointServiceImpl.java | 10 +++---
...nkHttpWatcher.java => FlinkAppHttpWatcher.java} | 42 +++++++++++-----------
...thProbingTask.java => FlinkAppLostWatcher.java} | 40 +++++++++------------
...rocessor.java => FlinkCheckpointProcessor.java} | 6 ++--
.../core/task/FlinkK8sChangeEventListener.java | 6 ++--
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 +-
.../resources/mapper/core/ApplicationMapper.xml | 6 ++--
.../service/ApplicationManageServiceITest.java | 8 ++---
17 files changed, 114 insertions(+), 129 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 97c77a79d..cfb2a5563 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.annotation;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
+
import org.aspectj.lang.ProceedingJoinPoint;
import java.lang.annotation.ElementType;
@@ -29,8 +31,7 @@ import java.lang.annotation.Target;
* application state update, need to add this annotation, This annotation
marks which methods will
* cause the application to be updated, Will work together with {@link
*
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
The
- * final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkHttpWatcher#WATCHING_APPS},
Make the state of the
+ * final purpose will be refresh {@link FlinkAppHttpWatcher#WATCHING_APPS},
Make the state of the
* job consistent with the database
*/
@Target(ElementType.METHOD)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 6d14119d5..a15583333 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -28,7 +28,7 @@ import
org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.enums.UserType;
import org.apache.streampark.console.core.service.CommonService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
@@ -58,7 +58,7 @@ import java.util.Objects;
@Aspect
public class StreamParkAspect {
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Autowired private CommonService commonService;
@Autowired private MemberService memberService;
@Autowired private ApplicationManageService applicationManageService;
@@ -93,7 +93,7 @@ public class StreamParkAspect {
MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
log.debug("appUpdated aspect, method:{}", methodSignature.getName());
Object target = joinPoint.proceed();
- flinkHttpWatcher.init();
+ flinkAppHttpWatcher.init();
return target;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
index d42537746..f9bb31604 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.bean;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -38,19 +40,20 @@ public class AlertProbeMsg {
private Integer cancelledJobs = 0;
- public void incrementProbeJobs() {
+ public void compute(FlinkAppState state) {
this.probeJobs++;
- }
-
- public void incrementFailedJobs() {
- this.failedJobs++;
- }
-
- public void incrementLostJobs() {
- this.lostJobs++;
- }
-
- public void incrementCancelledJobs() {
- this.cancelledJobs++;
+ switch (state) {
+ case LOST:
+ this.lostJobs++;
+ break;
+ case FAILED:
+ failedJobs++;
+ break;
+ case CANCELED:
+ cancelledJobs++;
+ break;
+ default:
+ break;
+ }
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index f0b283be1..98fd8dd53 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -49,8 +49,6 @@ import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import javax.annotation.Nonnull;
-
import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
@@ -59,8 +57,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import static org.apache.streampark.console.core.enums.FlinkAppState.of;
-
@Data
@TableName("t_flink_app")
@Slf4j
@@ -275,8 +271,7 @@ public class Application implements Serializable {
public void setState(Integer state) {
this.state = state;
- FlinkAppState appState = of(this.state);
- this.tracking = shouldTracking(appState);
+ this.tracking = shouldTracking() ? 1 : 0;
}
public void setYarnQueueByHotParams() {
@@ -301,8 +296,8 @@ public class Application implements Serializable {
*
* @return 1: need to be tracked | 0: no need to be tracked.
*/
- public static Integer shouldTracking(@Nonnull FlinkAppState state) {
- switch (state) {
+ public Boolean shouldTracking() {
+ switch (getStateEnum()) {
case ADDED:
case CREATED:
case FINISHED:
@@ -310,9 +305,9 @@ public class Application implements Serializable {
case CANCELED:
case TERMINATED:
case POS_TERMINATED:
- return 0;
+ return false;
default:
- return 1;
+ return true;
}
}
@@ -322,8 +317,7 @@ public class Application implements Serializable {
* @return true: can start | false: can not start.
*/
public boolean isCanBeStart() {
- FlinkAppState state = FlinkAppState.of(getState());
- switch (state) {
+ switch (getStateEnum()) {
case ADDED:
case CREATED:
case FAILED:
@@ -340,10 +334,6 @@ public class Application implements Serializable {
}
}
- public boolean shouldBeTrack() {
- return shouldTracking(FlinkAppState.of(getState())) == 1;
- }
-
@JsonIgnore
public ReleaseState getReleaseState() {
return ReleaseState.of(release);
@@ -355,7 +345,7 @@ public class Application implements Serializable {
}
@JsonIgnore
- public FlinkAppState getFlinkAppStateEnum() {
+ public FlinkAppState getStateEnum() {
return FlinkAppState.of(state);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 451ffc453..728f3b7f4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -65,7 +65,7 @@ import
org.apache.streampark.console.core.service.VariableService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -227,7 +227,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Override
public void cancel(Application appParam) throws Exception {
- FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionState.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
@@ -239,7 +239,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
applicationLog.setYarnAppId(application.getClusterId());
if (appParam.getSavePointed()) {
- FlinkHttpWatcher.addSavepoint(application.getId());
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
@@ -250,7 +250,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
Long userId = commonService.getUserId();
if (!application.getUserId().equals(userId)) {
- FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
+ FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -357,7 +357,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
- FlinkHttpWatcher.unWatching(application.getId());
+ FlinkAppHttpWatcher.unWatching(application.getId());
}
String exception = Utils.stringifyException(e);
@@ -508,8 +508,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
- FlinkHttpWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
- FlinkHttpWatcher.doWatching(application);
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
+ FlinkAppHttpWatcher.doWatching(application);
}
applicationLog.setSuccess(true);
@@ -530,7 +530,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
- FlinkHttpWatcher.unWatching(appParam.getId());
+ FlinkAppHttpWatcher.unWatching(appParam.getId());
}
}
})
@@ -750,7 +750,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
- FlinkHttpWatcher.unWatching(application.getId());
+ FlinkAppHttpWatcher.unWatching(application.getId());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index a292cde75..ca7ad1e04 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -40,8 +40,8 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavePointService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
@@ -115,7 +115,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
Integer runningJob = 0;
// stat metrics from other than kubernetes mode
- for (Application app : FlinkHttpWatcher.getWatchingApps()) {
+ for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
@@ -220,11 +220,10 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Override
public boolean checkAlter(Application appParam) {
Long appId = appParam.getId();
- FlinkAppState state = FlinkAppState.of(appParam.getState());
- if (!FlinkAppState.CANCELED.equals(state)) {
+ if (FlinkAppState.CANCELED != appParam.getStateEnum()) {
return false;
}
- long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
+ long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
long appUserId = appParam.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
@@ -244,11 +243,11 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Override
public boolean existsRunningByClusterId(Long clusterId) {
return baseMapper.existsRunningJobByClusterId(clusterId)
- || FlinkHttpWatcher.getWatchingApps().stream()
+ || FlinkAppHttpWatcher.getWatchingApps().stream()
.anyMatch(
application ->
clusterId.equals(application.getFlinkClusterId())
- &&
FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()));
+ &&
FlinkAppState.RUNNING.equals(application.getStateEnum()));
}
@Override
@@ -467,7 +466,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
flinkK8sObserver.watchApplication(application);
}
} else {
- FlinkHttpWatcher.doWatching(application);
+ FlinkAppHttpWatcher.doWatching(application);
}
return mapping;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index e89b16117..2dc38afb2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -53,7 +53,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
@@ -189,7 +189,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
flinkK8sObserver.unWatchById(application.getId());
}
} else {
- FlinkHttpWatcher.unWatching(appParam.getId());
+ FlinkAppHttpWatcher.unWatching(appParam.getId());
}
return true;
}
@@ -264,7 +264,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
.getCode()
.equals(record.getBuildStatus()))
.setAllowStart(
- !record.shouldBeTrack()
+ !record.shouldTracking()
&& PipelineStatus.success
.getCode()
.equals(record.getBuildStatus()))
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 147137800..a1fb0b4e8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -59,7 +59,7 @@ import
org.apache.streampark.console.core.service.SettingService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
@@ -141,7 +141,7 @@ public class AppBuildPipeServiceImpl
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Autowired private ApplicationConfigService applicationConfigService;
@@ -226,8 +226,8 @@ public class AppBuildPipeServiceImpl
app.setRelease(ReleaseState.RELEASING.get());
applicationManageService.updateRelease(app);
- if (flinkHttpWatcher.isWatchingApp(app.getId())) {
- flinkHttpWatcher.init();
+ if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+ flinkAppHttpWatcher.init();
}
// 1) checkEnv
@@ -358,8 +358,8 @@ public class AppBuildPipeServiceImpl
}
applicationManageService.updateRelease(app);
applicationLogService.save(applicationLog);
- if (flinkHttpWatcher.isWatchingApp(app.getId())) {
- flinkHttpWatcher.init();
+ if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+ flinkAppHttpWatcher.init();
}
}
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index c14b66b5e..254818045 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -38,7 +38,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.mapper.ProjectMapper;
import org.apache.streampark.console.core.service.ProjectService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.ProjectBuildTask;
import org.apache.flink.configuration.MemorySize;
@@ -82,7 +82,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Autowired private ApplicationManageService applicationManageService;
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
private final ExecutorService executorService =
new ThreadPoolExecutor(
@@ -207,7 +207,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
if (buildState == BuildState.SUCCESSFUL) {
baseMapper.updateBuildTime(id);
}
- flinkHttpWatcher.init();
+ flinkAppHttpWatcher.init();
},
fileLogger -> {
List<Application> applications =
@@ -222,7 +222,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
app.setBuild(true);
this.applicationManageService.updateRelease(app);
});
- flinkHttpWatcher.init();
+ flinkAppHttpWatcher.init();
});
CompletableFuture<Void> buildTask =
CompletableFuture.runAsync(projectBuildTask, executorService);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 766d5a74f..16d09a246 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavePointService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.SavepointResponse;
import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -101,7 +101,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
private final ExecutorService executorService =
new ThreadPoolExecutor(
@@ -176,12 +176,12 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
- FlinkHttpWatcher.addSavepoint(application.getId());
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
application.setOptionTime(new Date());
this.applicationManageService.updateById(application);
- flinkHttpWatcher.init();
+ flinkAppHttpWatcher.init();
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -266,7 +266,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
application.setOptionState(OptionState.NONE.getValue());
application.setOptionTime(new Date());
applicationManageService.update(application);
- flinkHttpWatcher.init();
+ flinkAppHttpWatcher.init();
});
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
similarity index 94%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index d61018564..ce60a1919 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -75,7 +75,7 @@ import java.util.stream.Collectors;
/** This implementation is currently used for tracing flink job on
yarn,standalone,remote mode */
@Slf4j
@Component
-public class FlinkHttpWatcher {
+public class FlinkAppHttpWatcher {
@Autowired private ApplicationManageService applicationManageService;
@Autowired private ApplicationActionService applicationActionService;
@@ -83,7 +83,7 @@ public class FlinkHttpWatcher {
@Autowired private AlertService alertService;
- @Autowired private CheckpointProcessor checkpointProcessor;
+ @Autowired private FlinkCheckpointProcessor checkpointProcessor;
@Autowired private FlinkClusterService flinkClusterService;
@@ -182,7 +182,7 @@ public class FlinkHttpWatcher {
@PreDestroy
public void doStop() {
log.info(
- "FlinkHttpWatcher StreamPark Console will be shutdown,persistent
application to database.");
+ "FlinkAppHttpWatcher StreamPark Console will be shutdown,persistent
application to database.");
WATCHING_APPS.forEach((k, v) -> applicationInfoService.persistMetrics(v));
}
@@ -209,7 +209,7 @@ public class FlinkHttpWatcher {
@VisibleForTesting
public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
Application app = WATCHING_APPS.get(appId);
- return (app == null || app.getState() == null) ? null :
FlinkAppState.of(app.getState());
+ return (app == null || app.getState() == null) ? null : app.getStateEnum();
}
private void watch(Long id, Application application) {
@@ -234,7 +234,7 @@ public class FlinkHttpWatcher {
// non-mapping
if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
log.error(
- "FlinkHttpWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ "FlinkAppHttpWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
savePointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
@@ -252,9 +252,9 @@ public class FlinkHttpWatcher {
cleanSavepoint(application);
cleanOptioning(optionState, id);
doPersistMetrics(application, true);
- FlinkAppState appState =
FlinkAppState.of(application.getState());
- if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
- doAlert(application,
FlinkAppState.of(application.getState()));
+ FlinkAppState appState = application.getStateEnum();
+ if (appState == FlinkAppState.FAILED || appState ==
FlinkAppState.LOST) {
+ doAlert(application, application.getStateEnum());
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationActionService.start(application, true);
@@ -456,14 +456,14 @@ public class FlinkHttpWatcher {
break;
case CANCELED:
log.info(
- "FlinkHttpWatcher getFromFlinkRestApi, job state {}, stop tracking
and delete stopFrom!",
+ "FlinkAppHttpWatcher getFromFlinkRestApi, job state {}, stop
tracking and delete stopFrom!",
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
if (StopFrom.NONE.equals(stopFrom) ||
applicationInfoService.checkAlter(application)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.info(
- "FlinkHttpWatcher getFromFlinkRestApi, job cancel is not form
StreamPark,savePoint expired!");
+ "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not
form StreamPark,savePoint expired!");
savePointService.expire(application.getId());
}
stopCanceledJob(application.getId());
@@ -483,7 +483,7 @@ public class FlinkHttpWatcher {
break;
case RESTARTING:
log.info(
- "FlinkHttpWatcher getFromFlinkRestApi, job state {},add to
starting",
+ "FlinkAppHttpWatcher getFromFlinkRestApi, job state {},add to
starting",
currentState.name());
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
break;
@@ -501,7 +501,7 @@ public class FlinkHttpWatcher {
* @param stopFrom stopFrom
*/
private void getFromYarnRestApi(Application application, StopFrom stopFrom)
throws Exception {
- log.debug("FlinkHttpWatcher getFromYarnRestApi starting...");
+ log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting...");
OptionState optionState = OPTIONING.get(application.getId());
/*
@@ -511,10 +511,10 @@ public class FlinkHttpWatcher {
*/
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
- log.info("FlinkHttpWatcher previous state: canceling.");
+ log.info("FlinkAppHttpWatcher previous state: canceling.");
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkHttpWatcher query previous state was canceling and stopFrom
NotFound,savePoint expired!");
+ "FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
application.setState(FlinkAppState.CANCELED.getValue());
@@ -526,7 +526,7 @@ public class FlinkHttpWatcher {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi
failed ");
+ throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
failed ");
}
} else {
try {
@@ -538,7 +538,7 @@ public class FlinkHttpWatcher {
if (FlinkAppState.KILLED.equals(flinkAppState)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkHttpWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
+ "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
flinkAppState = FlinkAppState.CANCELED;
@@ -563,7 +563,7 @@ public class FlinkHttpWatcher {
}
} catch (Exception e) {
if
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi
error,", e);
+ throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
error,", e);
}
}
}
@@ -587,7 +587,7 @@ public class FlinkHttpWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkHttpWatcher setOptioning");
+ log.info("FlinkAppHttpWatcher setOptioning");
OPTIONING.put(appId, state);
if (state.equals(OptionState.CANCELLING)) {
STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -598,7 +598,7 @@ public class FlinkHttpWatcher {
if (isKubernetesApp(application)) {
return;
}
- log.info("FlinkHttpWatcher add app to tracking,appId:{}",
application.getId());
+ log.info("FlinkAppHttpWatcher add app to tracking,appId:{}",
application.getId());
WATCHING_APPS.put(application.getId(), application);
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
}
@@ -607,7 +607,7 @@ public class FlinkHttpWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkHttpWatcher add app to savepoint,appId:{}", appId);
+ log.info("FlinkAppHttpWatcher add app to savepoint,appId:{}", appId);
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
@@ -615,7 +615,7 @@ public class FlinkHttpWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkHttpWatcher stop app,appId:{}", appId);
+ log.info("FlinkAppHttpWatcher stop app,appId:{}", appId);
WATCHING_APPS.remove(appId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
similarity index 83%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
index d19865f34..d9f7381fd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
@@ -36,15 +36,17 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static org.apache.streampark.console.core.enums.FlinkAppState.LOST;
import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
/** This implementation is currently used for probe on yarn,remote,K8s mode */
@Slf4j
@Component
-public class AutoHealthProbingTask {
+public class FlinkAppLostWatcher {
@Autowired private ApplicationManageService applicationManageService;
@@ -63,14 +65,14 @@ public class AutoHealthProbingTask {
private long lastWatchTime = 0L;
- private Boolean isProbing = false;
+ private AtomicBoolean isProbing = new AtomicBoolean(false);
private Short retryAttempts = PROBE_RETRY_COUNT;
@Scheduled(fixedDelay = 1000)
- private void schedule() {
+ private void start() {
long timeMillis = System.currentTimeMillis();
- if (isProbing) {
+ if (isProbing.get()) {
if (timeMillis - lastWatchTime >= PROBE_WAIT_INTERVAL.toMillis()) {
handleProbeResults();
lastWatchTime = timeMillis;
@@ -78,28 +80,28 @@ public class AutoHealthProbingTask {
} else {
if (timeMillis - lastWatchTime >= PROBE_INTERVAL.toMillis()) {
lastWatchTime = timeMillis;
- probe(Collections.emptyList());
+ watch(Collections.emptyList());
}
}
}
- public void probe(List<Application> applications) {
+ public void watch(List<Application> applications) {
List<Application> probeApplication =
applications.isEmpty() ? applicationManageService.getProbeApps() :
applications;
if (probeApplication.isEmpty()) {
log.info("there is no application that needs to be probe");
return;
}
- isProbing = true;
+ isProbing.set(true);
probeApplication =
probeApplication.stream()
.filter(application ->
FlinkAppState.isLost(application.getState()))
.collect(Collectors.toList());
- updateProbingState(probeApplication);
+ updateState(probeApplication);
probeApplication.stream().forEach(this::monitorApplication);
}
- private void updateProbingState(List<Application> applications) {
+ private void updateState(List<Application> applications) {
applications.stream()
.filter(application -> FlinkAppState.isLost(application.getState()))
.forEach(
@@ -113,7 +115,7 @@ public class AutoHealthProbingTask {
private void handleProbeResults() {
List<Application> probeApps = applicationManageService.getProbeApps();
if (shouldRetry(probeApps)) {
- probe(probeApps);
+ watch(probeApps);
} else {
List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
alertProbeMsgs.stream().forEach(this::alert);
@@ -129,7 +131,7 @@ public class AutoHealthProbingTask {
});
applicationManageService.updateBatchById(applications);
retryAttempts = PROBE_RETRY_COUNT;
- isProbing = false;
+ isProbing.set(false);
}
private void alert(AlertProbeMsg alertProbeMsg) {
@@ -155,17 +157,9 @@ public class AutoHealthProbingTask {
apps.forEach(
app -> {
alertProbeMsg.setUser(app.getUserName());
- alertProbeMsg.incrementProbeJobs();
- if (app.getState() ==
FlinkAppState.LOST.getValue()) {
- alertProbeMsg.incrementLostJobs();
- } else if (app.getState() ==
FlinkAppState.FAILED.getValue()) {
- alertProbeMsg.incrementFailedJobs();
- } else if (app.getState() ==
FlinkAppState.CANCELED.getValue()) {
- alertProbeMsg.incrementCancelledJobs();
- }
+ alertProbeMsg.compute(app.getStateEnum());
alertIds.add(app.getAlertId());
});
-
alertProbeMsg.setAlertId(alertIds);
return alertProbeMsg;
})))
@@ -176,14 +170,12 @@ public class AutoHealthProbingTask {
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
- FlinkHttpWatcher.doWatching(application);
+ FlinkAppHttpWatcher.doWatching(application);
}
}
private Boolean shouldRetry(List<Application> applications) {
- return applications.stream()
- .anyMatch(
- application -> FlinkAppState.LOST.getValue() ==
application.getState().intValue())
+ return applications.stream().anyMatch(application ->
application.getStateEnum() == LOST)
&& (retryAttempts-- > 0);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
index 02d4f8ca1..5e30557c9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
-public class CheckpointProcessor {
+public class FlinkCheckpointProcessor {
private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
private static final Integer SAVEPOINT_CACHE_HOUR = 1;
@@ -70,7 +70,7 @@ public class CheckpointProcessor {
@Autowired private SavePointService savePointService;
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
public void process(Application application, @Nonnull CheckPoints
checkPoints) {
checkPoints.getLatestCheckpoint().forEach(checkPoint ->
process(application, checkPoint));
@@ -86,7 +86,7 @@ public class CheckpointProcessor {
if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
- flinkHttpWatcher.cleanSavepoint(application);
+ flinkAppHttpWatcher.cleanSavepoint(application);
return;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 3006a1b6b..45560c35f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -72,7 +72,7 @@ public class FlinkK8sChangeEventListener {
@Lazy @Autowired private AlertService alertService;
- @Lazy @Autowired private CheckpointProcessor checkpointProcessor;
+ @Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor;
private final ExecutorService executor =
new ThreadPoolExecutor(
@@ -104,7 +104,7 @@ public class FlinkK8sChangeEventListener {
applicationInfoService.persistMetrics(app);
// email alerts when necessary
- FlinkAppState state = FlinkAppState.of(app.getState());
+ FlinkAppState state = app.getStateEnum();
if (FlinkAppState.FAILED.equals(state)
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
@@ -175,7 +175,7 @@ public class FlinkK8sChangeEventListener {
// infer the final flink job state
Enumeration.Value state =
FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
- jobStatus.jobState(),
toK8sFlinkJobState(FlinkAppState.of(app.getState())));
+ jobStatus.jobState(), toK8sFlinkJobState(app.getStateEnum()));
// corrective start-time / end-time / duration
long preStartTime = app.getStartTime() != null ?
app.getStartTime().getTime() : 0;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 7840d06e6..2d5a1a339 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -116,7 +116,7 @@ public class FlinkK8sWatcherWrapper {
}
// filter out the application that should be tracking
return k8sApplication.stream()
- .filter(app ->
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+ .filter(app ->
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getStateEnum())))
.map(Bridge::toTrackId)
.collect(Collectors.toList());
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index c25a504a4..f57b8efe7 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -265,9 +265,9 @@
else u.nick_name
end as nick_name
from t_flink_app t
- inner join t_user u
- on t.user_id = u.user_id
- where (t.tracking = 1 and t.state = 13) or t.probing = 1
+ inner join t_user u
+ on t.user_id = u.user_id
+ where t.probing = 1 or (t.tracking = 1 and t.state = 13)
</select>
<update id="mapping"
parameterType="org.apache.streampark.console.core.entity.Application">
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
index 58e36b501..79ee5ca43 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
@@ -29,7 +29,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -45,7 +45,7 @@ import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import static
org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL;
+import static
org.apache.streampark.console.core.task.FlinkAppHttpWatcher.WATCHING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -70,7 +70,7 @@ class ApplicationManageServiceITest extends
SpringIntegrationTestBase {
@Autowired private FlinkSqlService sqlService;
- @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@BeforeAll
static void setup() {
@@ -145,7 +145,7 @@ class ApplicationManageServiceITest extends
SpringIntegrationTestBase {
CompletableFuture.supplyAsync(
() -> {
while (true) {
- if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId())
+ if
(flinkAppHttpWatcher.tryQueryFlinkAppState(application.getId())
== FlinkAppState.RUNNING) {
break;
}