This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch dev-2.1.4 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit aba9eb6b4ad1ab98f4e95226637d12d336fd4431 Author: benjobs <[email protected]> AuthorDate: Mon Apr 15 21:44:04 2024 +0800 [Improve] ThreadPoolExecutor param improvement --- .../console/core/task/FlinkK8sWatcherWrapper.java | 2 +- .../flink/kubernetes/ChangeEventBus.scala | 12 +++++++--- .../flink/kubernetes/KubernetesRetriever.scala | 27 ++++++++++++---------- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java index bcacd21ba..f63f21b37 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java @@ -104,7 +104,7 @@ public class FlinkK8sWatcherWrapper { } // filter out the application that should be tracking return k8sApplication.stream() - .filter(app -> FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()))) + .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()))) .map(this::toTrackId) .collect(Collectors.toList()); } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala index e7d8d3404..ea1749564 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala @@ -17,18 +17,24 @@ package org.apache.streampark.flink.kubernetes +import org.apache.streampark.common.util.ThreadUtils + import com.google.common.eventbus.{AsyncEventBus, EventBus} import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} class ChangeEventBus { + private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2) + private val execPool = new ThreadPoolExecutor( - Runtime.getRuntime.availableProcessors * 5, - Runtime.getRuntime.availableProcessors * 10, + CPU_NUM, + CPU_NUM * 5, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue[Runnable](1024)) + new LinkedBlockingQueue[Runnable], + ThreadUtils.threadFactory("streampark-k8s-watching-thread") + ) private[kubernetes] val asyncEventBus = new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index 47bfd62ea..da6870f17 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -87,18 +87,21 @@ object KubernetesRetriever extends Logger { val clientFactory: ClusterClientFactory[String] = clusterClientServiceLoader.getClusterClientFactory(flinkConfig) - val clusterProvider: KubernetesClusterDescriptor = - clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor] - - Try { - clusterProvider - .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) - .getClusterClient - } match { - case Success(v) => Some(v) - case Failure(e) => - logError(s"Get flinkClient error, the error is: $e") - None + Utils.using( + clientFactory + .createClusterDescriptor(flinkConfig) + .asInstanceOf[KubernetesClusterDescriptor]) { + clusterProvider => + Try { + clusterProvider + .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) + .getClusterClient + } match { + case Success(v) => Some(v) + case Failure(e) => + logError(s"Get flinkClient error, the error is: $e") + None + } } }
