This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 90cd46f50a [Improve][EventService] improve event code and extract 
event code to EventService (#7153)
90cd46f50a is described below

commit 90cd46f50ac2f01e11ac5b7002688ea6c657cc82
Author: Eric <[email protected]>
AuthorDate: Thu Sep 5 18:25:38 2024 +0800

    [Improve][EventService] improve event code and extract event code to 
EventService (#7153)
---
 .../seatunnel/engine/server/EventService.java      | 100 +++++++++++++++++++++
 .../seatunnel/engine/server/SeaTunnelServer.java   |  10 ++-
 .../engine/server/TaskExecutionService.java        |  55 ++----------
 3 files changed, 114 insertions(+), 51 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
new file mode 100644
index 0000000000..0c7b654b21
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public class EventService {
+    private final BlockingQueue<Event> eventBuffer;
+
+    private ExecutorService eventForwardService;
+
+    private final NodeEngineImpl nodeEngine;
+
+    public EventService(NodeEngineImpl nodeEngine) {
+        eventBuffer = new ArrayBlockingQueue<>(2048);
+        initEventForwardService();
+        this.nodeEngine = nodeEngine;
+    }
+
+    private void initEventForwardService() {
+        eventForwardService =
+                Executors.newSingleThreadExecutor(
+                        new 
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
+        eventForwardService.submit(
+                () -> {
+                    List<Event> events = new ArrayList<>();
+                    RetryUtils.RetryMaterial retryMaterial =
+                            new RetryUtils.RetryMaterial(2, true, e -> true);
+                    while (!Thread.currentThread().isInterrupted()) {
+                        try {
+                            events.clear();
+
+                            Event first = eventBuffer.take();
+                            events.add(first);
+
+                            eventBuffer.drainTo(events, 500);
+                            JobEventReportOperation operation = new 
JobEventReportOperation(events);
+
+                            RetryUtils.retryWithException(
+                                    () ->
+                                            
NodeEngineUtil.sendOperationToMasterNode(
+                                                            nodeEngine, 
operation)
+                                                    .join(),
+                                    retryMaterial);
+
+                            log.debug("Event forward success, events " + 
events.size());
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            log.info("Event forward thread interrupted");
+                        } catch (Throwable t) {
+                            log.warn("Event forward failed, discard events " + 
events.size(), t);
+                        }
+                    }
+                });
+    }
+
+    public void reportEvent(Event e) {
+        while (!eventBuffer.offer(e)) {
+            eventBuffer.poll();
+            log.warn("Event buffer is full, discard the oldest event");
+        }
+    }
+
+    public void shutdownNow() {
+        if (eventForwardService != null) {
+            eventForwardService.shutdownNow();
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index b76af4c19a..99cd27d564 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -76,6 +76,8 @@ public class SeaTunnelServer
 
     private volatile boolean isRunning = true;
 
+    @Getter private EventService eventService;
+
     public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
         this.liveOperationRegistry = new LiveOperationRegistry();
         this.seaTunnelConfig = seaTunnelConfig;
@@ -116,6 +118,8 @@ public class SeaTunnelServer
                 new DefaultClassLoaderService(
                         
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());
 
+        eventService = new EventService(nodeEngine);
+
         if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
                 == 
seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
             startWorker();
@@ -149,7 +153,7 @@ public class SeaTunnelServer
     private void startWorker() {
         taskExecutionService =
                 new TaskExecutionService(
-                        classLoaderService, nodeEngine, 
nodeEngine.getProperties());
+                        classLoaderService, nodeEngine, 
nodeEngine.getProperties(), eventService);
         
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
         taskExecutionService.start();
         getSlotService();
@@ -176,6 +180,10 @@ public class SeaTunnelServer
         if (coordinatorService != null) {
             coordinatorService.shutdown();
         }
+
+        if (eventService != null) {
+            eventService.shutdownNow();
+        }
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b45544def1..b32dd7c6a9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.event.Event;
 import org.apache.seatunnel.api.tracing.MDCExecutorService;
 import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -32,7 +31,6 @@ import 
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
-import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
 import 
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -51,12 +49,10 @@ import 
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClie
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
-import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.instance.impl.NodeState;
 import com.hazelcast.internal.metrics.DynamicMetricsProvider;
@@ -76,7 +72,6 @@ import lombok.SneakyThrows;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,7 +79,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -150,13 +144,13 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
     private final ServerConnectorPackageClient serverConnectorPackageClient;
 
-    private final BlockingQueue<Event> eventBuffer;
-    private final ExecutorService eventForwardService;
+    private final EventService eventService;
 
     public TaskExecutionService(
             ClassLoaderService classLoaderService,
             NodeEngineImpl nodeEngine,
-            HazelcastProperties properties) {
+            HazelcastProperties properties,
+            EventService eventService) {
         seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
         this.nodeEngine = nodeEngine;
@@ -179,42 +173,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         serverConnectorPackageClient =
                 new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
 
-        eventBuffer = new ArrayBlockingQueue<>(2048);
-        eventForwardService =
-                Executors.newSingleThreadExecutor(
-                        new 
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
-        eventForwardService.submit(
-                () -> {
-                    List<Event> events = new ArrayList<>();
-                    RetryUtils.RetryMaterial retryMaterial =
-                            new RetryUtils.RetryMaterial(2, true, e -> true);
-                    while (!Thread.currentThread().isInterrupted()) {
-                        try {
-                            events.clear();
-
-                            Event first = eventBuffer.take();
-                            events.add(first);
-
-                            eventBuffer.drainTo(events, 500);
-                            JobEventReportOperation operation = new 
JobEventReportOperation(events);
-
-                            RetryUtils.retryWithException(
-                                    () ->
-                                            
NodeEngineUtil.sendOperationToMasterNode(
-                                                            nodeEngine, 
operation)
-                                                    .join(),
-                                    retryMaterial);
-
-                            logger.fine("Event forward success, events " + 
events.size());
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            logger.info("Event forward thread interrupted");
-                        } catch (Throwable t) {
-                            logger.warning(
-                                    "Event forward failed, discard events " + 
events.size(), t);
-                        }
-                    }
-                });
+        this.eventService = eventService;
     }
 
     public void start() {
@@ -225,7 +184,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         isRunning = false;
         executorService.shutdownNow();
         scheduledExecutorService.shutdown();
-        eventForwardService.shutdownNow();
     }
 
     public TaskGroupContext getExecutionContext(TaskGroupLocation 
taskGroupLocation) {
@@ -691,10 +649,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     }
 
     public void reportEvent(Event e) {
-        while (!eventBuffer.offer(e)) {
-            eventBuffer.poll();
-            logger.warning("Event buffer is full, discard the oldest event");
-        }
+        eventService.reportEvent(e);
     }
 
     private final class BlockingWorker implements Runnable {

Reply via email to