This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 01159ec923 [Feature] Support listening for message delayed events in 
cdc source (#6634)
01159ec923 is described below

commit 01159ec923358fa13237245c050c1c851aaec07f
Author: hailin0 <[email protected]>
AuthorDate: Sun Apr 7 19:38:41 2024 +0800

    [Feature] Support listening for message delayed events in cdc source (#6634)
---
 .../org/apache/seatunnel/api/event/EventType.java  |  1 +
 .../api/source/event/MessageDelayedEvent.java      | 37 ++++++++----
 .../reader/IncrementalSourceRecordEmitter.java     | 10 ++++
 .../cdc/base/utils/MessageDelayedEventLimiter.java | 50 ++++++++++++++++
 .../base/utils/MessageDelayedEventLimiterTest.java | 70 ++++++++++++++++++++++
 .../container/seatunnel/SeaTunnelContainer.java    |  1 +
 .../engine/server/TaskExecutionService.java        | 55 +++++++++++++++++
 .../server/event/JobEventHttpReportHandler.java    |  4 +-
 .../engine/server/event/JobEventListener.java      |  4 +-
 .../server/event/JobEventReportOperation.java      | 13 ++--
 .../event/JobEventHttpReportHandlerTest.java       | 48 +++++++++------
 11 files changed, 255 insertions(+), 38 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index 46acd316b4..edb1b72f36 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -29,4 +29,5 @@ public enum EventType {
     LIFECYCLE_READER_OPEN,
     LIFECYCLE_READER_CLOSE,
     LIFECYCLE_WRITER_CLOSE,
+    READER_MESSAGE_DELAYED,
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
similarity index 55%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
index 4e834b828c..f27cc7e9e5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
@@ -15,24 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.event;
+package org.apache.seatunnel.api.source.event;
 
 import org.apache.seatunnel.api.event.Event;
-import org.apache.seatunnel.api.event.EventListener;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.api.event.EventType;
 
 import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
 
+@Getter
+@Setter
+@ToString
 @AllArgsConstructor
-public class JobEventListener implements EventListener {
-    private final TaskLocation taskLocation;
-    private final TaskExecutionContext taskExecutionContext;
+@NoArgsConstructor
+public class MessageDelayedEvent implements Event {
+    private long createdTime;
+    private String jobId;
+    private EventType eventType = EventType.READER_MESSAGE_DELAYED;
 
-    @Override
-    public void onEvent(Event event) {
-        event.setJobId(String.valueOf(taskLocation.getJobId()));
-        JobEventReportOperation evenCollectOperation = new 
JobEventReportOperation(event);
-        taskExecutionContext.sendToMaster(evenCollectOperation).join();
+    private long delayTime;
+    private String record;
+
+    public MessageDelayedEvent(long delayTime) {
+        this(delayTime, null);
+    }
+
+    public MessageDelayedEvent(long delayTime, String record) {
+        this.delayTime = delayTime;
+        this.record = record;
+        this.createdTime = System.currentTimeMillis();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index a98a9d0959..7c25d3ce5c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Counter;
 import org.apache.seatunnel.api.event.EventListener;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.event.MessageDelayedEvent;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
@@ -28,6 +29,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
 import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
+import 
org.apache.seatunnel.connectors.cdc.base.utils.MessageDelayedEventLimiter;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
 
@@ -35,6 +37,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -71,6 +74,8 @@ public class IncrementalSourceRecordEmitter<T>
     protected final Counter recordFetchDelay;
     protected final Counter recordEmitDelay;
     protected final EventListener eventListener;
+    protected final MessageDelayedEventLimiter delayedEventLimiter =
+            new MessageDelayedEventLimiter(Duration.ofSeconds(1), 0.5d);
 
     public IncrementalSourceRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -113,6 +118,11 @@ public class IncrementalSourceRecordEmitter<T>
             // report emit delay
             long emitDelay = now - messageTimestamp;
             recordEmitDelay.set(emitDelay > 0 ? emitDelay : 0);
+
+            // limit the emit event frequency
+            if (delayedEventLimiter.acquire(messageTimestamp)) {
+                eventListener.onEvent(new MessageDelayedEvent(emitDelay, 
element.toString()));
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
new file mode 100644
index 0000000000..af1c72035d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
+
+import lombok.AllArgsConstructor;
+
+import java.time.Duration;
+
+@AllArgsConstructor
+public class MessageDelayedEventLimiter {
+    private final long delayMs;
+    private final RateLimiter eventRateLimiter;
+
+    public MessageDelayedEventLimiter(Duration delayThreshold) {
+        this(delayThreshold, 1);
+    }
+
+    public MessageDelayedEventLimiter(Duration delayThreshold, double 
permitsPerSecond) {
+        this.delayMs = delayThreshold.toMillis();
+        this.eventRateLimiter = RateLimiter.create(permitsPerSecond);
+    }
+
+    public boolean acquire(long messageCreateTime) {
+        if (isDelayed(messageCreateTime)) {
+            return eventRateLimiter.tryAcquire();
+        }
+        return false;
+    }
+
+    private boolean isDelayed(long messageCreateTime) {
+        return delayMs != 0 && System.currentTimeMillis() - messageCreateTime 
>= delayMs;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
new file mode 100644
index 0000000000..5e0332ed95
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+public class MessageDelayedEventLimiterTest {
+
+    @Test
+    public void testAcquire() throws InterruptedException {
+        double permitsPerSecond = 0.5;
+        Duration delayThreshold = Duration.ofMillis(1000);
+        MessageDelayedEventLimiter delayedEventLimiter =
+                new MessageDelayedEventLimiter(delayThreshold, 
permitsPerSecond);
+
+        long endTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10);
+        long actualAcquiredCount = 0;
+        while (System.currentTimeMillis() < endTime) {
+            boolean acquired =
+                    delayedEventLimiter.acquire(
+                            System.currentTimeMillis() - 
(delayThreshold.toMillis() * 10));
+            if (acquired) {
+                actualAcquiredCount++;
+            }
+            Thread.sleep(1);
+        }
+        long expectedAcquiredCount = (long) (TimeUnit.SECONDS.toSeconds(10) * 
permitsPerSecond);
+
+        Assertions.assertTrue(expectedAcquiredCount >= actualAcquiredCount);
+    }
+
+    @Test
+    public void testNoAcquire() throws InterruptedException {
+        double permitsPerSecond = 0.5;
+        Duration delayThreshold = Duration.ofMillis(1000);
+        MessageDelayedEventLimiter delayedEventLimiter =
+                new MessageDelayedEventLimiter(delayThreshold, 
permitsPerSecond);
+
+        long endTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10);
+        long actualAcquiredCount = 0;
+        while (System.currentTimeMillis() < endTime) {
+            boolean acquired = 
delayedEventLimiter.acquire(System.currentTimeMillis());
+            if (acquired) {
+                actualAcquiredCount++;
+            }
+            Thread.sleep(1);
+        }
+
+        Assertions.assertTrue(actualAcquiredCount == 0);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index ef83f83257..ca55adbe89 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -250,6 +250,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || s.contains("Java2D Disposer")
                 || s.contains("OkHttp ConnectionPool")
                 || s.startsWith("http-report-event-scheduler")
+                || s.startsWith("event-forwarder")
                 || s.contains(
                         
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
                 || s.startsWith("Log4j2-TF-")
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 197833903d..049c9c374a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.event.Event;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -28,6 +30,7 @@ import 
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
 import 
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -46,10 +49,12 @@ import 
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClie
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.instance.impl.NodeState;
 import com.hazelcast.internal.metrics.DynamicMetricsProvider;
 import com.hazelcast.internal.metrics.MetricDescriptor;
@@ -68,6 +73,7 @@ import lombok.SneakyThrows;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -140,6 +147,9 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
     private final ServerConnectorPackageClient serverConnectorPackageClient;
 
+    private final BlockingQueue<Event> eventBuffer;
+    private final ExecutorService eventForwardService;
+
     public TaskExecutionService(
             ClassLoaderService classLoaderService,
             NodeEngineImpl nodeEngine,
@@ -165,6 +175,43 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
         serverConnectorPackageClient =
                 new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
+
+        eventBuffer = new ArrayBlockingQueue<>(2048);
+        eventForwardService =
+                Executors.newSingleThreadExecutor(
+                        new 
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
+        eventForwardService.submit(
+                () -> {
+                    List<Event> events = new ArrayList<>();
+                    RetryUtils.RetryMaterial retryMaterial =
+                            new RetryUtils.RetryMaterial(2, true, e -> true);
+                    while (!Thread.currentThread().isInterrupted()) {
+                        try {
+                            events.clear();
+
+                            Event first = eventBuffer.take();
+                            events.add(first);
+
+                            eventBuffer.drainTo(events, 500);
+                            JobEventReportOperation operation = new 
JobEventReportOperation(events);
+
+                            RetryUtils.retryWithException(
+                                    () ->
+                                            
NodeEngineUtil.sendOperationToMasterNode(
+                                                            nodeEngine, 
operation)
+                                                    .join(),
+                                    retryMaterial);
+
+                            logger.fine("Event forward success, events " + 
events.size());
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            logger.info("Event forward thread interrupted");
+                        } catch (Throwable t) {
+                            logger.warning(
+                                    "Event forward failed, discard events " + 
events.size(), t);
+                        }
+                    }
+                });
     }
 
     public void start() {
@@ -175,6 +222,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         isRunning = false;
         executorService.shutdownNow();
         scheduledExecutorService.shutdown();
+        eventForwardService.shutdownNow();
     }
 
     public TaskGroupContext getExecutionContext(TaskGroupLocation 
taskGroupLocation) {
@@ -619,6 +667,13 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         }
     }
 
+    public void reportEvent(Event e) {
+        while (!eventBuffer.offer(e)) {
+            eventBuffer.poll();
+            logger.warning("Event buffer is full, discard the oldest event");
+        }
+    }
+
     private final class BlockingWorker implements Runnable {
 
         private final TaskTracker tracker;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
index f1e0fe9ac8..fab3201e5e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.event;
 
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.seatunnel.api.event.Event;
@@ -104,7 +105,8 @@ public class JobEventHttpReportHandler implements 
EventHandler {
         completionStage.toCompletableFuture().join();
     }
 
-    private void report() throws IOException {
+    @VisibleForTesting
+    synchronized void report() throws IOException {
         long headSequence = ringbuffer.headSequence();
         if (headSequence > committedEventIndex) {
             log.warn(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
index 4e834b828c..9b252d35c2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
@@ -32,7 +32,7 @@ public class JobEventListener implements EventListener {
     @Override
     public void onEvent(Event event) {
         event.setJobId(String.valueOf(taskLocation.getJobId()));
-        JobEventReportOperation evenCollectOperation = new 
JobEventReportOperation(event);
-        taskExecutionContext.sendToMaster(evenCollectOperation).join();
+
+        taskExecutionContext.getTaskExecutionService().reportEvent(event);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
index cc2500f3b3..9e3da06cf0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.event;
 
 import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventProcessor;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
@@ -33,24 +34,28 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.List;
 
 @NoArgsConstructor
 @AllArgsConstructor
 public class JobEventReportOperation extends Operation implements 
IdentifiedDataSerializable {
 
-    private Event event;
+    private List<Event> events;
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        server.getCoordinatorService().getEventProcessor().process(event);
+        EventProcessor processor = 
server.getCoordinatorService().getEventProcessor();
+        for (Event event : events) {
+            processor.process(event);
+        }
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
                 ObjectOutputStream objectOut = new 
ObjectOutputStream(byteOut)) {
-            objectOut.writeObject(event);
+            objectOut.writeObject(events);
             objectOut.flush();
             out.writeByteArray(byteOut.toByteArray());
         }
@@ -60,7 +65,7 @@ public class JobEventReportOperation extends Operation 
implements IdentifiedData
     protected void readInternal(ObjectDataInput in) throws IOException {
         try (ByteArrayInputStream byteIn = new 
ByteArrayInputStream(in.readByteArray());
                 ObjectInputStream objectIn = new ObjectInputStream(byteIn)) {
-            event = (Event) objectIn.readObject();
+            events = (List<Event>) objectIn.readObject();
         } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
index e5a978d486..72e0907490 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.config.Config;
+import com.hazelcast.config.RingbufferConfig;
+import com.hazelcast.config.RingbufferStoreConfig;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.ringbuffer.Ringbuffer;
@@ -38,38 +40,60 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import okio.Buffer;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.given;
 
+@Slf4j
 public class JobEventHttpReportHandlerTest {
+    private static final String ringBufferName = "test";
+    private static final int capacity = 1000;
     private static HazelcastInstance hazelcast;
     private static MockWebServer mockWebServer;
 
     @BeforeAll
     public static void before() throws IOException {
-        hazelcast = Hazelcast.newHazelcastInstance();
+        Config config = new Config();
+        config.setRingbufferConfigs(
+                Collections.singletonMap(
+                        ringBufferName,
+                        new RingbufferConfig(ringBufferName)
+                                .setCapacity(capacity)
+                                .setBackupCount(0)
+                                .setAsyncBackupCount(1)
+                                .setTimeToLiveSeconds(0)
+                                .setRingbufferStoreConfig(
+                                        new 
RingbufferStoreConfig().setEnabled(false))));
+        hazelcast = Hazelcast.newHazelcastInstance(config);
         mockWebServer = new MockWebServer();
         mockWebServer.start();
-        mockWebServer.enqueue(new MockResponse().setResponseCode(200));
+        for (int i = 0; i < capacity; i++) {
+            mockWebServer.enqueue(new MockResponse().setResponseCode(200));
+        }
     }
 
     @AfterAll
     public static void after() throws IOException {
         hazelcast.shutdown();
-        mockWebServer.shutdown();
+        try {
+            mockWebServer.shutdown();
+        } catch (Exception e) {
+            log.error("Failed to shutdown mockWebServer", e);
+        }
     }
 
     @Test
     public void testReportEvent() throws IOException, InterruptedException {
         int maxEvents = 1000;
-        Ringbuffer ringbuffer = createRingBuffer(maxEvents);
+        Ringbuffer ringbuffer = hazelcast.getRingbuffer(ringBufferName);
         JobEventHttpReportHandler handler =
                 new JobEventHttpReportHandler(
                         mockWebServer.url("/api").toString(), 
Duration.ofSeconds(1), ringbuffer);
@@ -80,6 +104,7 @@ public class JobEventHttpReportHandlerTest {
                 .await()
                 .atMost(10, TimeUnit.SECONDS)
                 .until(() -> mockWebServer.getRequestCount(), count -> count > 
0);
+        handler.report();
         handler.close();
 
         List<TestEvent> events = new ArrayList<>();
@@ -100,21 +125,6 @@ public class JobEventHttpReportHandlerTest {
         }
     }
 
-    private Ringbuffer createRingBuffer(int capacity) {
-        String ringBufferName = "test";
-        hazelcast
-                .getConfig()
-                .addRingBufferConfig(
-                        new Config()
-                                .getRingbufferConfig(ringBufferName)
-                                .setCapacity(capacity)
-                                .setBackupCount(0)
-                                .setAsyncBackupCount(1)
-                                .setTimeToLiveSeconds(0));
-        Ringbuffer ringbuffer = hazelcast.getRingbuffer(ringBufferName);
-        return ringbuffer;
-    }
-
     @Getter
     @Setter
     @NoArgsConstructor

Reply via email to