This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8cade0cd0 Add @AllowConcurrentEvents annotation (#2644)
8cade0cd0 is described below

commit 8cade0cd0df1290e269d666be553c42b8d93554a
Author: BIN <[email protected]>
AuthorDate: Sat Apr 22 13:55:37 2023 +0800

    Add @AllowConcurrentEvents annotation (#2644)
    
    * Add @AllowConcurrentEvents annotation
    
    * Adjust thread pool parameters
---
 .idea/icon.png                                          | Bin 50140 -> 0 bytes
 .../console/core/task/FlinkK8sChangeEventListener.java  |   4 ++++
 .../streampark/flink/kubernetes/ChangeEventBus.scala    |   4 ++--
 .../flink/kubernetes/DefaultFlinkK8sWatcher.scala       |   6 ++++--
 4 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/.idea/icon.png b/.idea/icon.png
deleted file mode 100644
index 574278cee..000000000
Binary files a/.idea/icon.png and /dev/null differ
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 0d7fead67..a9e9962f9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -35,6 +35,7 @@ import 
org.apache.streampark.flink.kubernetes.model.JobStatusCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
@@ -76,6 +77,7 @@ public class FlinkK8sChangeEventListener {
    * org.apache.streampark.console.core.entity.Application records.
    */
   @SuppressWarnings("UnstableApiUsage")
+  @AllowConcurrentEvents
   @Subscribe
   public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) {
     JobStatusCV jobStatus = event.jobStatus();
@@ -104,6 +106,7 @@ public class FlinkK8sChangeEventListener {
    * org.apache.streampark.console.core.entity.Application records.
    */
   @SuppressWarnings("UnstableApiUsage")
+  @AllowConcurrentEvents
   @Subscribe
   public void subscribeMetricsChange(FlinkClusterMetricChangeEvent event) {
     TrackId trackId = event.trackId();
@@ -129,6 +132,7 @@ public class FlinkK8sChangeEventListener {
   }
 
   @SuppressWarnings("UnstableApiUsage")
+  @AllowConcurrentEvents
   @Subscribe
   public void subscribeCheckpointChange(FlinkJobCheckpointChangeEvent event) {
     CheckPoints.CheckPoint completed = new CheckPoints.CheckPoint();
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
index bf09ef444..e7d8d3404 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
@@ -24,11 +24,11 @@ import java.util.concurrent.{LinkedBlockingQueue, 
ThreadPoolExecutor, TimeUnit}
 class ChangeEventBus {
 
   private val execPool = new ThreadPoolExecutor(
+    Runtime.getRuntime.availableProcessors * 5,
     Runtime.getRuntime.availableProcessors * 10,
-    Runtime.getRuntime.availableProcessors * 20,
     60L,
     TimeUnit.SECONDS,
-    new LinkedBlockingQueue[Runnable](2048))
+    new LinkedBlockingQueue[Runnable](1024))
 
   private[kubernetes] val asyncEventBus =
     new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index 19e2c9454..e9a3b06e1 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -23,7 +23,7 @@ import 
org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobState
 import org.apache.streampark.flink.kubernetes.model._
 import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher, 
FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
 
-import com.google.common.eventbus.Subscribe
+import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}
 
 import javax.annotation.Nullable
 
@@ -125,7 +125,9 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = 
FlinkTrackConfig.defaultCo
      * FlinkJobStatusChangeEvent.
      */
     // noinspection UnstableApiUsage
-    @Subscribe def subscribeFlinkJobStateEvent(event: FlinkJobStateEvent): 
Unit = {
+    @Subscribe
+    @AllowConcurrentEvents
+    def subscribeFlinkJobStateEvent(event: FlinkJobStateEvent): Unit = {
       if (event.trackId.isLegal) {
         val latest = watchController.jobStatuses.get(event.trackId)
         // determine if the current event should be ignored

Reply via email to