This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new b69b19877 [Improve] executors improvement (#3535)
b69b19877 is described below
commit b69b198779f7b68edd09281e803f2baf8502974d
Author: benjobs <[email protected]>
AuthorDate: Tue Feb 6 12:44:59 2024 +0800
[Improve] executors improvement (#3535)
* [Improve] ThreadPoolExecutor improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/common/util/YarnUtils.scala | 12 +-
.../console/core/annotation/AppUpdated.java | 5 +-
.../console/core/aspect/StreamParkAspect.java | 6 +-
.../core/service/alert/impl/AlertServiceImpl.java | 19 ++-
.../core/service/impl/AppBuildPipeServiceImpl.java | 42 +++--
.../core/service/impl/ApplicationServiceImpl.java | 57 ++++---
.../core/service/impl/FlinkClusterServiceImpl.java | 27 +--
.../core/service/impl/ProjectServiceImpl.java | 23 +--
.../core/service/impl/SavePointServiceImpl.java | 26 +--
.../console/core/task/CheckpointProcessor.java | 4 +-
...ESTAPIWatcher.java => FlinkAppHttpWatcher.java} | 186 +++++++++++----------
.../core/task/FlinkK8sChangeEventListener.java | 19 +--
12 files changed, 228 insertions(+), 198 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 9400f688f..7be3bcbf3 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -222,11 +222,11 @@ object YarnUtils extends Logger {
* @return
*/
@throws[IOException]
- def restRequest(url: String): String = {
+ def restRequest(url: String, timeout: Int = 5000): String = {
if (url == null) return null
url match {
case u if u.matches("^http(|s)://.*") =>
- Try(request(url)) match {
+ Try(request(url, timeout)) match {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
@@ -236,11 +236,11 @@ object YarnUtils extends Logger {
}
}
case _ =>
- Try(request(s"${getRMWebAppURL()}/$url")) match {
+ Try(request(s"${getRMWebAppURL()}/$url", timeout)) match {
case Success(v) => v
case Failure(_) =>
Utils.retry[String](5) {
- request(s"${getRMWebAppURL(true)}/$url")
+ request(s"${getRMWebAppURL(true)}/$url", timeout)
} match {
case Success(v) => v
case Failure(e) =>
@@ -250,8 +250,8 @@ object YarnUtils extends Logger {
}
}
- private[this] def request(reqUrl: String): String = {
- val config = RequestConfig.custom.setConnectTimeout(5000).build
+ private[this] def request(reqUrl: String, timeout: Int): String = {
+ val config = RequestConfig.custom.setConnectTimeout(timeout).build
if (hasYarnHttpKerberosAuth) {
HadoopUtils
.getUgi()
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 345a361c7..cfb2a5563 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.annotation;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
+
import org.aspectj.lang.ProceedingJoinPoint;
import java.lang.annotation.ElementType;
@@ -29,8 +31,7 @@ import java.lang.annotation.Target;
* application state update, need to add this annotation, This annotation
marks which methods will
* cause the application to be updated, Will work together with {@link
*
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
The
- * final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkRESTAPIWatcher#WATCHING_APPS},
Make the state of the
+ * final purpose will be refresh {@link FlinkAppHttpWatcher#WATCHING_APPS},
Make the state of the
* job consistent with the database
*/
@Target(ElementType.METHOD)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 90f7eade0..f9f2e6372 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -26,7 +26,7 @@ import
org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.enums.UserType;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ServiceHelper;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
@@ -56,7 +56,7 @@ import java.util.Objects;
@Aspect
public class StreamParkAspect {
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Autowired private ServiceHelper serviceHelper;
@Autowired private MemberService memberService;
@Autowired private ApplicationService applicationService;
@@ -91,7 +91,7 @@ public class StreamParkAspect {
MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
log.debug("appUpdated aspect, method:{}", methodSignature.getName());
Object target = joinPoint.proceed();
- flinkRESTAPIWatcher.init();
+ flinkAppHttpWatcher.initialize();
return target;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 19a40249e..b08b0d2cf 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.alert.impl;
+import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.base.util.SpringContextUtils;
@@ -39,22 +40,35 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class AlertServiceImpl implements AlertService {
@Autowired private AlertConfigService alertConfigService;
+ private final ExecutorService notifyExecutor =
+ new ThreadPoolExecutor(
+ 1,
+ 2,
+ 20L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-notify"));
+
@Override
public void alert(Application application, CheckPointStatus
checkPointStatus) {
AlertTemplate alertTemplate = AlertTemplate.of(application,
checkPointStatus);
- alert(application, alertTemplate);
+ notifyExecutor.submit(() -> alert(application, alertTemplate));
}
@Override
public void alert(Application application, FlinkAppState appState) {
AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
- alert(application, alertTemplate);
+ notifyExecutor.submit(() -> alert(application, alertTemplate));
}
private void alert(Application application, AlertTemplate alertTemplate) {
@@ -73,6 +87,7 @@ public class AlertServiceImpl implements AlertService {
@Override
public boolean alert(AlertConfigWithParams params, AlertTemplate
alertTemplate)
throws AlertException {
+
List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
if (CollectionUtils.isEmpty(alertTypes)) {
return true;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 00f46799e..b735797c0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -51,7 +51,7 @@ import
org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
@@ -89,6 +89,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
@@ -131,20 +132,10 @@ public class AppBuildPipeServiceImpl
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Autowired private ApplicationConfigService applicationConfigService;
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
- new ThreadPoolExecutor.AbortPolicy());
-
private static final Cache<Long, DockerPullSnapshot>
DOCKER_PULL_PG_SNAPSHOTS =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
@@ -154,6 +145,22 @@ public class AppBuildPipeServiceImpl
private static final Cache<Long, DockerPushSnapshot>
DOCKER_PUSH_PG_SNAPSHOTS =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
+ private static final int CPU_NUM = Math.max(4,
Runtime.getRuntime().availableProcessors() * 2);
+
+ private final ExecutorService buildPipelineExecutor =
+ new ThreadPoolExecutor(
+ 1,
+ CPU_NUM,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-flink-buildPipeline"));
+
+ @PreDestroy
+ public void shutdown() {
+ buildPipelineExecutor.shutdown();
+ }
+
@Override
public boolean buildApplication(@Nonnull Application app, ApplicationLog
applicationLog) {
// 1) flink sql setDependency
@@ -183,8 +190,8 @@ public class AppBuildPipeServiceImpl
app.setRelease(ReleaseState.RELEASING.get());
applicationService.updateRelease(app);
- if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
- flinkRESTAPIWatcher.init();
+ if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+ flinkAppHttpWatcher.initialize();
}
// 1) checkEnv
@@ -257,8 +264,8 @@ public class AppBuildPipeServiceImpl
}
applicationService.updateRelease(app);
applicationLogService.save(applicationLog);
- if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
- flinkRESTAPIWatcher.init();
+ if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+ flinkAppHttpWatcher.initialize();
}
}
});
@@ -291,8 +298,7 @@ public class AppBuildPipeServiceImpl
DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
- // async release pipeline
- executorService.submit((Runnable) pipeline::launch);
+ buildPipelineExecutor.submit(pipeline::launch);
return saved;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index bd189cdbe..8360f7439 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,8 +78,8 @@ import
org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -126,6 +126,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
@@ -172,16 +173,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-deploy-executor"),
- new ThreadPoolExecutor.AbortPolicy());
-
private static final Pattern JOB_NAME_PATTERN =
Pattern.compile("^[.\\x{4e00}-\\x{9fa5}A-Za-z\\d_\\-\\s]+$");
@@ -221,11 +212,27 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+ private static final int CPU_NUM = Math.max(2,
Runtime.getRuntime().availableProcessors() * 4);
+
+ private final ExecutorService bootstrapExecutor =
+ new ThreadPoolExecutor(
+ 1,
+ CPU_NUM,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-flink-app-bootstrap"));
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
}
+ @PreDestroy
+ public void shutdown() {
+ bootstrapExecutor.shutdown();
+ }
+
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();
@@ -243,7 +250,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Integer runningJob = 0;
// stat metrics from other than kubernetes mode
- for (Application app : FlinkRESTAPIWatcher.getWatchingApps()) {
+ for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
@@ -450,7 +457,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
- long cancelUserId = FlinkRESTAPIWatcher.getCanceledJobUserId(appId);
+ long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
long appUserId = application.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
@@ -544,7 +551,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (exists) {
return true;
}
- for (Application application : FlinkRESTAPIWatcher.getWatchingApps()) {
+ for (Application application : FlinkAppHttpWatcher.getWatchingApps()) {
if (clusterId.equals(application.getFlinkClusterId())
&& FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()))
{
return true;
@@ -1238,14 +1245,14 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (application.isKubernetesModeJob()) {
flinkK8sWatcher.doWatching(k8sWatcherWrapper.toTrackId(application));
} else {
- FlinkRESTAPIWatcher.doWatching(application);
+ FlinkAppHttpWatcher.doWatching(application);
}
return mapping;
}
@Override
public void cancel(Application appParam) throws Exception {
- FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.CANCELLING);
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionState.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
@@ -1257,7 +1264,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setYarnAppId(application.getClusterId());
if (appParam.getSavePointed()) {
- FlinkRESTAPIWatcher.addSavepoint(application.getId());
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
@@ -1268,7 +1275,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Long userId = serviceHelper.getUserId();
if (!application.getUserId().equals(userId)) {
- FlinkRESTAPIWatcher.addCanceledApp(application.getId(), userId);
+ FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -1315,7 +1322,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
final Date triggerTime = new Date();
CompletableFuture<CancelResponse> cancelFuture =
- CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest),
executorService);
+ CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest),
bootstrapExecutor);
cancelFutureMap.put(application.getId(), cancelFuture);
@@ -1347,7 +1354,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (application.isKubernetesModeJob()) {
flinkK8sWatcher.unWatching(trackId);
} else {
- FlinkRESTAPIWatcher.unWatching(application.getId());
+ FlinkAppHttpWatcher.unWatching(application.getId());
}
}
return;
@@ -1604,7 +1611,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
exposedType);
CompletableFuture<SubmitResponse> future =
- CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest),
executorService);
+ CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest),
bootstrapExecutor);
startFutureMap.put(application.getId(), future);
@@ -1631,7 +1638,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
TrackId trackId = k8sWatcherWrapper.toTrackId(application);
flinkK8sWatcher.unWatching(trackId);
} else {
- FlinkRESTAPIWatcher.unWatching(appParam.getId());
+ FlinkAppHttpWatcher.unWatching(appParam.getId());
}
}
return;
@@ -1692,8 +1699,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
} else {
- FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
- FlinkRESTAPIWatcher.doWatching(application);
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
+ FlinkAppHttpWatcher.doWatching(application);
}
// update app
updateById(application);
@@ -1786,7 +1793,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
TrackId id = k8sWatcherWrapper.toTrackId(application);
flinkK8sWatcher.doWatching(id);
} else {
- FlinkRESTAPIWatcher.unWatching(application.getId());
+ FlinkAppHttpWatcher.unWatching(application.getId());
}
// kill application
if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 88690f50e..1240313d9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -73,16 +73,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
private static final String ERROR_CLUSTER_QUEUE_HINT =
"Queue label '%s' isn't available in database, please add it first.";
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-cluster-executor"),
- new ThreadPoolExecutor.AbortPolicy());
-
@Autowired private FlinkEnvService flinkEnvService;
@Autowired private ServiceHelper serviceHelper;
@@ -93,6 +83,17 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private SettingService settingService;
+ private static final int CPU_NUM = Math.max(4,
Runtime.getRuntime().availableProcessors() * 2);
+
+ private final ExecutorService bootstrapExecutor =
+ new ThreadPoolExecutor(
+ 1,
+ CPU_NUM,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-flink-cluster-bootstrap"));
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
@@ -169,10 +170,10 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
try {
// 1) deployRequest
DeployRequest deployRequest = getDeployRequest(flinkCluster);
- log.info("deploy cluster request: {}", deployRequest);
+ log.info("deploy cluster request: {}", deployRequest);
Future<DeployResponse> future =
- executorService.submit(() -> FlinkClient.deploy(deployRequest));
+ bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
if (deployResponse.error() != null) {
throw new ApiDetailException(
@@ -311,7 +312,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
DeployRequest deployRequest = getDeployRequest(flinkCluster);
try {
Future<ShutDownResponse> future =
- executorService.submit(() -> FlinkClient.shutdown(deployRequest));
+ bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
if (shutDownResponse.error() != null) {
throw new ApiDetailException(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 931dd712f..0285329bb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -38,7 +38,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.mapper.ProjectMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ProjectService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.task.ProjectBuildTask;
import org.apache.flink.configuration.MemorySize;
@@ -80,17 +80,18 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Autowired private ApplicationService applicationService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
- private final ExecutorService executorService =
+ private static final int CPU_NUM = Math.max(4,
Runtime.getRuntime().availableProcessors() * 2);
+
+ private final ExecutorService projectBuildExecutor =
new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
+ 1,
+ CPU_NUM,
60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-build-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-project-build"));
@Override
public RestResponse create(Project project) {
@@ -204,7 +205,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
if (buildState == BuildState.SUCCESSFUL) {
baseMapper.updateBuildTime(id);
}
- flinkRESTAPIWatcher.init();
+ flinkAppHttpWatcher.initialize();
},
fileLogger -> {
List<Application> applications =
@@ -219,10 +220,10 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
app.setBuild(true);
this.applicationService.updateRelease(app);
});
- flinkRESTAPIWatcher.init();
+ flinkAppHttpWatcher.initialize();
});
CompletableFuture<Void> buildTask =
- CompletableFuture.runAsync(projectBuildTask, executorService);
+ CompletableFuture.runAsync(projectBuildTask, projectBuildExecutor);
// TODO May need to define parameters to set the build timeout in the
future.
CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 8dde05c8b..3f132f014 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavePointService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.SavepointResponse;
import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -93,17 +93,18 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
- private final ExecutorService executorService =
+ private static final int CPU_NUM = Math.max(2,
Runtime.getRuntime().availableProcessors());
+
+ private final ExecutorService flinkTriggerExecutor =
new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
+ 1,
+ CPU_NUM,
60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("trigger-savepoint-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
@Override
public void expire(Long appId) {
@@ -288,12 +289,12 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
- FlinkRESTAPIWatcher.addSavepoint(application.getId());
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
application.setOptionTime(new Date());
this.applicationService.updateById(application);
- flinkRESTAPIWatcher.init();
+ flinkAppHttpWatcher.initialize();
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -316,7 +317,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
application.getK8sNamespace());
CompletableFuture<SavepointResponse> savepointFuture =
- CompletableFuture.supplyAsync(() ->
FlinkClient.triggerSavepoint(request), executorService);
+ CompletableFuture.supplyAsync(
+ () -> FlinkClient.triggerSavepoint(request), flinkTriggerExecutor);
handleSavepointResponseFuture(application, applicationLog,
savepointFuture);
}
@@ -360,7 +362,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
application.setOptionState(OptionState.NONE.getValue());
application.setOptionTime(new Date());
applicationService.update(application);
- flinkRESTAPIWatcher.init();
+ flinkAppHttpWatcher.initialize();
});
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 42b1a4137..092f6b6ae 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -70,7 +70,7 @@ public class CheckpointProcessor {
@Autowired private SavePointService savePointService;
- @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+ @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
public void process(Application application, @Nonnull CheckPoints
checkPoints) {
checkPoints.getLatestCheckpoint().forEach(checkPoint ->
process(application, checkPoint));
@@ -86,7 +86,7 @@ public class CheckpointProcessor {
if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(),
DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
- flinkRESTAPIWatcher.cleanSavepoint(application);
+ flinkAppHttpWatcher.cleanSavepoint(application);
return;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
similarity index 83%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index f1b1b5a61..9f406e655 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -68,7 +68,7 @@ import java.util.stream.Collectors;
/** This implementation is currently used for tracing flink job on
yarn,standalone,remote mode */
@Slf4j
@Component
-public class FlinkRESTAPIWatcher {
+public class FlinkAppHttpWatcher {
@Autowired private ApplicationService applicationService;
@@ -85,6 +85,8 @@ public class FlinkRESTAPIWatcher {
// option interval within 10 seconds
private static final long OPTION_INTERVAL = 1000L * 10;
+ private static final int HTTP_TIMEOUT = 2000;
+
/**
*
*
@@ -142,17 +144,19 @@ public class FlinkRESTAPIWatcher {
private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
- private static final ExecutorService EXECUTOR =
+ private static final int CPU_NUM = Math.max(4,
Runtime.getRuntime().availableProcessors() * 2);
+
+ private static final ExecutorService watchExecutor =
new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
+ CPU_NUM,
+ CPU_NUM * 5,
60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("flink-restapi-watching-executor"));
+ new LinkedBlockingQueue<>(),
+ ThreadUtils.threadFactory("streampark-flink-app-watching"));
@PostConstruct
- public void init() {
+ public void initialize() {
WATCHING_APPS.clear();
List<Application> applications =
applicationService.list(
@@ -165,7 +169,7 @@ public class FlinkRESTAPIWatcher {
@PreDestroy
public void doStop() {
log.info(
- "FlinkRESTAPIWatcher StreamPark Console will be shutdown,persistent
application to database.");
+ "[StreamPark][FlinkAppHttpWatcher] StreamPark Console will be
shutdown,persistent application to database.");
WATCHING_APPS.forEach((k, v) -> applicationService.persistMetrics(v));
}
@@ -182,90 +186,87 @@ public class FlinkRESTAPIWatcher {
// The application has been started at the first time, or the front-end is
operating start/stop,
// need to return status info immediately.
if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
- doWatch();
+ watch();
} else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL)
{
// The last operation time is less than option interval.(10 seconds)
- doWatch();
+ watch();
} else if (System.currentTimeMillis() - lastWatchingTime >=
WATCHING_INTERVAL) {
// Normal information obtain, check if there is 5 seconds interval
between this time and the
// last time.(once every 5 seconds)
- doWatch();
+ watch();
}
}
- private void doWatch() {
+ private void watch() {
lastWatchingTime = System.currentTimeMillis();
- for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
- watch(entry.getKey(), entry.getValue());
+ for (Application application : WATCHING_APPS.values()) {
+ watchExecutor.execute(
+ () -> {
+ try {
+ // query status from flink rest api
+ getFromFlinkRestApi(application);
+ } catch (Exception flinkException) {
+ // query status from yarn rest api
+ try {
+ getFromYarnRestApi(application);
+ } catch (Exception yarnException) {
+ doStateFailed(application);
+ }
+ }
+ });
}
}
- private void watch(Long key, Application application) {
- EXECUTOR.execute(
- () -> {
- final StopFrom stopFrom =
- STOP_FROM_MAP.getOrDefault(key, null) == null
- ? StopFrom.NONE
- : STOP_FROM_MAP.get(key);
- final OptionState optionState = OPTIONING.get(key);
+ private void doStateFailed(Application application) {
+ /*
+ Query from flink's restAPI and yarn's restAPI both failed.
+ In this case, it is necessary to decide whether to return to the final
state depending on the state being operated
+ */
+ StopFrom stopFrom = getStopFrom(application);
+ OptionState optionState = OPTIONING.get(application.getId());
+ if (optionState == null || !optionState.equals(OptionState.STARTING)) {
+ // non-mapping
+ if (application.getState() != FlinkAppState.MAPPING.getValue()) {
+ log.error(
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ if (StopFrom.NONE.equals(stopFrom)) {
+ savePointService.expire(application.getId());
+ application.setState(FlinkAppState.LOST.getValue());
+ alertService.alert(application, FlinkAppState.LOST);
+ } else {
+ application.setState(FlinkAppState.CANCELED.getValue());
+ }
+ }
+ /*
+ This step means that the above two ways to get information have failed,
and this step is the last step,
+ which will directly identify the mission as cancelled or lost.
+ Need clean savepoint.
+ */
+ application.setEndTime(new Date());
+ cleanSavepoint(application);
+ cleanOptioning(optionState, application.getId());
+ doPersistMetrics(application, true);
+ FlinkAppState appState = FlinkAppState.of(application.getState());
+ if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
+ alertService.alert(application,
FlinkAppState.of(application.getState()));
+ if (appState.equals(FlinkAppState.FAILED)) {
try {
- // query status from flink rest api
- getFromFlinkRestApi(application, stopFrom);
- } catch (Exception flinkException) {
- // query status from yarn rest api
- try {
- getFromYarnRestApi(application, stopFrom);
- } catch (Exception yarnException) {
- /*
- Query from flink's restAPI and yarn's restAPI both failed.
- In this case, it is necessary to decide whether to return to
the final state depending on the state being operated
- */
- if (optionState == null ||
!optionState.equals(OptionState.STARTING)) {
- // non-mapping
- if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
- log.error(
- "FlinkRESTAPIWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
- if (StopFrom.NONE.equals(stopFrom)) {
- savePointService.expire(application.getId());
- application.setState(FlinkAppState.LOST.getValue());
- alertService.alert(application, FlinkAppState.LOST);
- } else {
- application.setState(FlinkAppState.CANCELED.getValue());
- }
- }
- /*
- This step means that the above two ways to get information
have failed, and this step is the last step,
- which will directly identify the mission as cancelled or lost.
- Need clean savepoint.
- */
- application.setEndTime(new Date());
- cleanSavepoint(application);
- cleanOptioning(optionState, key);
- doPersistMetrics(application, true);
- FlinkAppState appState =
FlinkAppState.of(application.getState());
- if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
- alertService.alert(application,
FlinkAppState.of(application.getState()));
- if (appState.equals(FlinkAppState.FAILED)) {
- try {
- applicationService.start(application, true);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
- }
- }
- }
+ applicationService.start(application, true);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
}
- });
+ }
+ }
+ }
}
/**
* Get the current task running status information from flink restapi
*
* @param application application
- * @param stopFrom stopFrom
*/
- private void getFromFlinkRestApi(Application application, StopFrom stopFrom)
throws Exception {
+ private void getFromFlinkRestApi(Application application) throws Exception {
+ StopFrom stopFrom = getStopFrom(application);
JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
ExecutionMode execMode = application.getExecutionModeEnum();
@@ -312,6 +313,12 @@ public class FlinkRESTAPIWatcher {
}
}
+ private StopFrom getStopFrom(Application application) {
+ return STOP_FROM_MAP.getOrDefault(application.getId(), null) == null
+ ? StopFrom.NONE
+ : STOP_FROM_MAP.get(application.getId());
+ }
+
/**
* handle job overview
*
@@ -445,14 +452,14 @@ public class FlinkRESTAPIWatcher {
break;
case CANCELED:
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {}, stop
tracking and delete stopFrom!",
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state
{}, stop tracking and delete stopFrom!",
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
if (StopFrom.NONE.equals(stopFrom) ||
applicationService.checkAlter(application)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job cancel is not
form StreamPark,savePoint expired!");
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job
cancel is not form StreamPark,savePoint expired!");
savePointService.expire(application.getId());
}
stopCanceledJob(application.getId());
@@ -472,7 +479,7 @@ public class FlinkRESTAPIWatcher {
break;
case RESTARTING:
log.info(
- "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {},add to
starting",
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state
{},add to starting",
currentState.name());
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
break;
@@ -487,10 +494,10 @@ public class FlinkRESTAPIWatcher {
* status of the task is CANCELED</strong>
*
* @param application application
- * @param stopFrom stopFrom
*/
- private void getFromYarnRestApi(Application application, StopFrom stopFrom)
throws Exception {
- log.debug("FlinkRESTAPIWatcher getFromYarnRestApi starting...");
+ private void getFromYarnRestApi(Application application) throws Exception {
+ log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi
starting...");
+ StopFrom stopFrom = getStopFrom(application);
OptionState optionState = OPTIONING.get(application.getId());
/*
@@ -500,7 +507,7 @@ public class FlinkRESTAPIWatcher {
*/
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
- log.info("FlinkRESTAPIWatcher previous state: canceling.");
+ log.info("[StreamPark][FlinkAppHttpWatcher] previous state: canceling.");
FlinkAppState flinkAppState = FlinkAppState.CANCELED;
try {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
@@ -511,7 +518,7 @@ public class FlinkRESTAPIWatcher {
} finally {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkRESTAPIWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
+ "[StreamPark][FlinkAppHttpWatcher] query previous state was
canceling and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
if (flinkAppState == FlinkAppState.KILLED || flinkAppState ==
FlinkAppState.FAILED) {
alertService.alert(application, flinkAppState);
@@ -528,7 +535,8 @@ public class FlinkRESTAPIWatcher {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi
failed ");
+ throw new RuntimeException(
+ "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi failed ");
}
} else {
try {
@@ -540,7 +548,7 @@ public class FlinkRESTAPIWatcher {
if (FlinkAppState.KILLED.equals(flinkAppState)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "FlinkRESTAPIWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
+ "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job
was killed and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
flinkAppState = FlinkAppState.CANCELED;
@@ -565,7 +573,8 @@ public class FlinkRESTAPIWatcher {
}
} catch (Exception e) {
if
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
- throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi
error,", e);
+ throw new RuntimeException(
+ "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi error,",
e);
}
}
}
@@ -589,7 +598,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher setOptioning");
+ log.info("[StreamPark][FlinkAppHttpWatcher] setOptioning");
OPTIONING.put(appId, state);
if (state.equals(OptionState.CANCELLING)) {
STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -600,7 +609,7 @@ public class FlinkRESTAPIWatcher {
if (application.isKubernetesModeJob()) {
return;
}
- log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}",
application.getId());
+ log.info("[StreamPark][FlinkAppHttpWatcher] add app to tracking,appId:{}",
application.getId());
WATCHING_APPS.put(application.getId(), application);
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
}
@@ -609,7 +618,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId);
+ log.info("[StreamPark][FlinkAppHttpWatcher] add app to
savepoint,appId:{}", appId);
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
@@ -617,7 +626,7 @@ public class FlinkRESTAPIWatcher {
if (isKubernetesApp(appId)) {
return;
}
- log.info("FlinkRESTAPIWatcher stop app,appId:{}", appId);
+ log.info("[StreamPark][FlinkAppHttpWatcher] stop app,appId:{}", appId);
WATCHING_APPS.remove(appId);
}
@@ -736,13 +745,14 @@ public class FlinkRESTAPIWatcher {
}
private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException
{
- String result = YarnUtils.restRequest(url);
+ String result = YarnUtils.restRequest(url, HTTP_TIMEOUT);
return JacksonUtils.read(result, clazz);
}
private <T> T httpRestRequest(String url, Class<T> clazz) throws IOException
{
String result =
- HttpClientUtils.httpGetRequest(url,
RequestConfig.custom().setConnectTimeout(5000).build());
+ HttpClientUtils.httpGetRequest(
+ url,
RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build());
if (null == result) {
return null;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 9cc077718..55053f1f0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.OptionState;
@@ -37,15 +36,12 @@ import
org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.Date;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import scala.Enumeration;
@@ -54,6 +50,7 @@ import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8
/** Event Listener for K8sFlinkTrackMonitor */
@Component
+@Slf4j
public class FlinkK8sChangeEventListener {
@Lazy @Autowired private ApplicationService applicationService;
@@ -62,16 +59,6 @@ public class FlinkK8sChangeEventListener {
@Lazy @Autowired private CheckpointProcessor checkpointProcessor;
- private final ExecutorService executor =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 20L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-notify-executor"),
- new ThreadPoolExecutor.AbortPolicy());
-
/**
* Catch FlinkJobStatusChangeEvent then storage it persistently to db.
Actually update
* org.apache.streampark.console.core.entity.Application records.
@@ -98,7 +85,7 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
|| FlinkAppState.FINISHED.equals(state)) {
- executor.execute(() -> alertService.alert(app, state));
+ alertService.alert(app, state);
}
}