This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 90cd46f50a [Improve][EventService] improve event code and extract
event code to EventService (#7153)
90cd46f50a is described below
commit 90cd46f50ac2f01e11ac5b7002688ea6c657cc82
Author: Eric <[email protected]>
AuthorDate: Thu Sep 5 18:25:38 2024 +0800
[Improve][EventService] improve event code and extract event code to
EventService (#7153)
---
.../seatunnel/engine/server/EventService.java | 100 +++++++++++++++++++++
.../seatunnel/engine/server/SeaTunnelServer.java | 10 ++-
.../engine/server/TaskExecutionService.java | 55 ++----------
3 files changed, 114 insertions(+), 51 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
new file mode 100644
index 0000000000..0c7b654b21
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public class EventService {
+ private final BlockingQueue<Event> eventBuffer;
+
+ private ExecutorService eventForwardService;
+
+ private final NodeEngineImpl nodeEngine;
+
+ public EventService(NodeEngineImpl nodeEngine) {
+ eventBuffer = new ArrayBlockingQueue<>(2048);
+ initEventForwardService();
+ this.nodeEngine = nodeEngine;
+ }
+
+ private void initEventForwardService() {
+ eventForwardService =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
+ eventForwardService.submit(
+ () -> {
+ List<Event> events = new ArrayList<>();
+ RetryUtils.RetryMaterial retryMaterial =
+ new RetryUtils.RetryMaterial(2, true, e -> true);
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ events.clear();
+
+ Event first = eventBuffer.take();
+ events.add(first);
+
+ eventBuffer.drainTo(events, 500);
+ JobEventReportOperation operation = new
JobEventReportOperation(events);
+
+ RetryUtils.retryWithException(
+ () ->
+
NodeEngineUtil.sendOperationToMasterNode(
+ nodeEngine,
operation)
+ .join(),
+ retryMaterial);
+
+ log.debug("Event forward success, events " +
events.size());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.info("Event forward thread interrupted");
+ } catch (Throwable t) {
+ log.warn("Event forward failed, discard events " +
events.size(), t);
+ }
+ }
+ });
+ }
+
+ public void reportEvent(Event e) {
+ while (!eventBuffer.offer(e)) {
+ eventBuffer.poll();
+ log.warn("Event buffer is full, discard the oldest event");
+ }
+ }
+
+ public void shutdownNow() {
+ if (eventForwardService != null) {
+ eventForwardService.shutdownNow();
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index b76af4c19a..99cd27d564 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -76,6 +76,8 @@ public class SeaTunnelServer
private volatile boolean isRunning = true;
+ @Getter private EventService eventService;
+
public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
@@ -116,6 +118,8 @@ public class SeaTunnelServer
new DefaultClassLoaderService(
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());
+ eventService = new EventService(nodeEngine);
+
if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
==
seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
startWorker();
@@ -149,7 +153,7 @@ public class SeaTunnelServer
private void startWorker() {
taskExecutionService =
new TaskExecutionService(
- classLoaderService, nodeEngine,
nodeEngine.getProperties());
+ classLoaderService, nodeEngine,
nodeEngine.getProperties(), eventService);
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
@@ -176,6 +180,10 @@ public class SeaTunnelServer
if (coordinatorService != null) {
coordinatorService.shutdown();
}
+
+ if (eventService != null) {
+ eventService.shutdownNow();
+ }
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b45544def1..b32dd7c6a9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -32,7 +31,6 @@ import
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
-import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -51,12 +49,10 @@ import
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClie
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
-import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.commons.collections4.CollectionUtils;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
@@ -76,7 +72,6 @@ import lombok.SneakyThrows;
import java.io.IOException;
import java.net.URL;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,7 +79,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -150,13 +144,13 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private final ServerConnectorPackageClient serverConnectorPackageClient;
- private final BlockingQueue<Event> eventBuffer;
- private final ExecutorService eventForwardService;
+ private final EventService eventService;
public TaskExecutionService(
ClassLoaderService classLoaderService,
NodeEngineImpl nodeEngine,
- HazelcastProperties properties) {
+ HazelcastProperties properties,
+ EventService eventService) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
@@ -179,42 +173,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
serverConnectorPackageClient =
new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
- eventBuffer = new ArrayBlockingQueue<>(2048);
- eventForwardService =
- Executors.newSingleThreadExecutor(
- new
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
- eventForwardService.submit(
- () -> {
- List<Event> events = new ArrayList<>();
- RetryUtils.RetryMaterial retryMaterial =
- new RetryUtils.RetryMaterial(2, true, e -> true);
- while (!Thread.currentThread().isInterrupted()) {
- try {
- events.clear();
-
- Event first = eventBuffer.take();
- events.add(first);
-
- eventBuffer.drainTo(events, 500);
- JobEventReportOperation operation = new
JobEventReportOperation(events);
-
- RetryUtils.retryWithException(
- () ->
-
NodeEngineUtil.sendOperationToMasterNode(
- nodeEngine,
operation)
- .join(),
- retryMaterial);
-
- logger.fine("Event forward success, events " +
events.size());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.info("Event forward thread interrupted");
- } catch (Throwable t) {
- logger.warning(
- "Event forward failed, discard events " +
events.size(), t);
- }
- }
- });
+ this.eventService = eventService;
}
public void start() {
@@ -225,7 +184,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
isRunning = false;
executorService.shutdownNow();
scheduledExecutorService.shutdown();
- eventForwardService.shutdownNow();
}
public TaskGroupContext getExecutionContext(TaskGroupLocation
taskGroupLocation) {
@@ -691,10 +649,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
public void reportEvent(Event e) {
- while (!eventBuffer.offer(e)) {
- eventBuffer.poll();
- logger.warning("Event buffer is full, discard the oldest event");
- }
+ eventService.reportEvent(e);
}
private final class BlockingWorker implements Runnable {