This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8cade0cd0 Add @AllowConcurrentEvents annotation (#2644)
8cade0cd0 is described below
commit 8cade0cd0df1290e269d666be553c42b8d93554a
Author: BIN <[email protected]>
AuthorDate: Sat Apr 22 13:55:37 2023 +0800
Add @AllowConcurrentEvents annotation (#2644)
* Add @AllowConcurrentEvents annotation
* Adjust thread pool parameters
---
.idea/icon.png | Bin 50140 -> 0 bytes
.../console/core/task/FlinkK8sChangeEventListener.java | 4 ++++
.../streampark/flink/kubernetes/ChangeEventBus.scala | 4 ++--
.../flink/kubernetes/DefaultFlinkK8sWatcher.scala | 6 ++++--
4 files changed, 10 insertions(+), 4 deletions(-)
diff --git a/.idea/icon.png b/.idea/icon.png
deleted file mode 100644
index 574278cee..000000000
Binary files a/.idea/icon.png and /dev/null differ
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 0d7fead67..a9e9962f9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -35,6 +35,7 @@ import
org.apache.streampark.flink.kubernetes.model.JobStatusCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
@@ -76,6 +77,7 @@ public class FlinkK8sChangeEventListener {
* org.apache.streampark.console.core.entity.Application records.
*/
@SuppressWarnings("UnstableApiUsage")
+ @AllowConcurrentEvents
@Subscribe
public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) {
JobStatusCV jobStatus = event.jobStatus();
@@ -104,6 +106,7 @@ public class FlinkK8sChangeEventListener {
* org.apache.streampark.console.core.entity.Application records.
*/
@SuppressWarnings("UnstableApiUsage")
+ @AllowConcurrentEvents
@Subscribe
public void subscribeMetricsChange(FlinkClusterMetricChangeEvent event) {
TrackId trackId = event.trackId();
@@ -129,6 +132,7 @@ public class FlinkK8sChangeEventListener {
}
@SuppressWarnings("UnstableApiUsage")
+ @AllowConcurrentEvents
@Subscribe
public void subscribeCheckpointChange(FlinkJobCheckpointChangeEvent event) {
CheckPoints.CheckPoint completed = new CheckPoints.CheckPoint();
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
index bf09ef444..e7d8d3404 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
@@ -24,11 +24,11 @@ import java.util.concurrent.{LinkedBlockingQueue,
ThreadPoolExecutor, TimeUnit}
class ChangeEventBus {
private val execPool = new ThreadPoolExecutor(
+ Runtime.getRuntime.availableProcessors * 5,
Runtime.getRuntime.availableProcessors * 10,
- Runtime.getRuntime.availableProcessors * 20,
60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue[Runnable](2048))
+ new LinkedBlockingQueue[Runnable](1024))
private[kubernetes] val asyncEventBus =
new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index 19e2c9454..e9a3b06e1 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -23,7 +23,7 @@ import
org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobState
import org.apache.streampark.flink.kubernetes.model._
import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher,
FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
-import com.google.common.eventbus.Subscribe
+import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}
import javax.annotation.Nullable
@@ -125,7 +125,9 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig =
FlinkTrackConfig.defaultCo
* FlinkJobStatusChangeEvent.
*/
// noinspection UnstableApiUsage
- @Subscribe def subscribeFlinkJobStateEvent(event: FlinkJobStateEvent):
Unit = {
+ @Subscribe
+ @AllowConcurrentEvents
+ def subscribeFlinkJobStateEvent(event: FlinkJobStateEvent): Unit = {
if (event.trackId.isLegal) {
val latest = watchController.jobStatuses.get(event.trackId)
// determine if the current event should be ignored