This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 01159ec923 [Feature] Support listening for message delayed events in
cdc source (#6634)
01159ec923 is described below
commit 01159ec923358fa13237245c050c1c851aaec07f
Author: hailin0 <[email protected]>
AuthorDate: Sun Apr 7 19:38:41 2024 +0800
[Feature] Support listening for message delayed events in cdc source (#6634)
---
.../org/apache/seatunnel/api/event/EventType.java | 1 +
.../api/source/event/MessageDelayedEvent.java | 37 ++++++++----
.../reader/IncrementalSourceRecordEmitter.java | 10 ++++
.../cdc/base/utils/MessageDelayedEventLimiter.java | 50 ++++++++++++++++
.../base/utils/MessageDelayedEventLimiterTest.java | 70 ++++++++++++++++++++++
.../container/seatunnel/SeaTunnelContainer.java | 1 +
.../engine/server/TaskExecutionService.java | 55 +++++++++++++++++
.../server/event/JobEventHttpReportHandler.java | 4 +-
.../engine/server/event/JobEventListener.java | 4 +-
.../server/event/JobEventReportOperation.java | 13 ++--
.../event/JobEventHttpReportHandlerTest.java | 48 +++++++++------
11 files changed, 255 insertions(+), 38 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index 46acd316b4..edb1b72f36 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -29,4 +29,5 @@ public enum EventType {
LIFECYCLE_READER_OPEN,
LIFECYCLE_READER_CLOSE,
LIFECYCLE_WRITER_CLOSE,
+ READER_MESSAGE_DELAYED,
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
similarity index 55%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
index 4e834b828c..f27cc7e9e5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/event/MessageDelayedEvent.java
@@ -15,24 +15,37 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.event;
+package org.apache.seatunnel.api.source.event;
import org.apache.seatunnel.api.event.Event;
-import org.apache.seatunnel.api.event.EventListener;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.api.event.EventType;
import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+@Getter
+@Setter
+@ToString
@AllArgsConstructor
-public class JobEventListener implements EventListener {
- private final TaskLocation taskLocation;
- private final TaskExecutionContext taskExecutionContext;
+@NoArgsConstructor
+public class MessageDelayedEvent implements Event {
+ private long createdTime;
+ private String jobId;
+ private EventType eventType = EventType.READER_MESSAGE_DELAYED;
- @Override
- public void onEvent(Event event) {
- event.setJobId(String.valueOf(taskLocation.getJobId()));
- JobEventReportOperation evenCollectOperation = new
JobEventReportOperation(event);
- taskExecutionContext.sendToMaster(evenCollectOperation).join();
+ private long delayTime;
+ private String record;
+
+ public MessageDelayedEvent(long delayTime) {
+ this(delayTime, null);
+ }
+
+ public MessageDelayedEvent(long delayTime, String record) {
+ this.delayTime = delayTime;
+ this.record = record;
+ this.createdTime = System.currentTimeMillis();
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index a98a9d0959..7c25d3ce5c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.event.MessageDelayedEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
@@ -28,6 +29,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import
org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
+import
org.apache.seatunnel.connectors.cdc.base.utils.MessageDelayedEventLimiter;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
@@ -35,6 +37,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import lombok.extern.slf4j.Slf4j;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -71,6 +74,8 @@ public class IncrementalSourceRecordEmitter<T>
protected final Counter recordFetchDelay;
protected final Counter recordEmitDelay;
protected final EventListener eventListener;
+ protected final MessageDelayedEventLimiter delayedEventLimiter =
+ new MessageDelayedEventLimiter(Duration.ofSeconds(1), 0.5d);
public IncrementalSourceRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -113,6 +118,11 @@ public class IncrementalSourceRecordEmitter<T>
// report emit delay
long emitDelay = now - messageTimestamp;
recordEmitDelay.set(emitDelay > 0 ? emitDelay : 0);
+
+ // limit the emit event frequency
+ if (delayedEventLimiter.acquire(messageTimestamp)) {
+ eventListener.onEvent(new MessageDelayedEvent(emitDelay,
element.toString()));
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
new file mode 100644
index 0000000000..af1c72035d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import
org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
+
+import lombok.AllArgsConstructor;
+
+import java.time.Duration;
+
+@AllArgsConstructor
+public class MessageDelayedEventLimiter {
+ private final long delayMs;
+ private final RateLimiter eventRateLimiter;
+
+ public MessageDelayedEventLimiter(Duration delayThreshold) {
+ this(delayThreshold, 1);
+ }
+
+ public MessageDelayedEventLimiter(Duration delayThreshold, double
permitsPerSecond) {
+ this.delayMs = delayThreshold.toMillis();
+ this.eventRateLimiter = RateLimiter.create(permitsPerSecond);
+ }
+
+ public boolean acquire(long messageCreateTime) {
+ if (isDelayed(messageCreateTime)) {
+ return eventRateLimiter.tryAcquire();
+ }
+ return false;
+ }
+
+ private boolean isDelayed(long messageCreateTime) {
+ return delayMs != 0 && System.currentTimeMillis() - messageCreateTime
>= delayMs;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
new file mode 100644
index 0000000000..5e0332ed95
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/utils/MessageDelayedEventLimiterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+public class MessageDelayedEventLimiterTest {
+
+ @Test
+ public void testAcquire() throws InterruptedException {
+ double permitsPerSecond = 0.5;
+ Duration delayThreshold = Duration.ofMillis(1000);
+ MessageDelayedEventLimiter delayedEventLimiter =
+ new MessageDelayedEventLimiter(delayThreshold,
permitsPerSecond);
+
+ long endTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(10);
+ long actualAcquiredCount = 0;
+ while (System.currentTimeMillis() < endTime) {
+ boolean acquired =
+ delayedEventLimiter.acquire(
+ System.currentTimeMillis() -
(delayThreshold.toMillis() * 10));
+ if (acquired) {
+ actualAcquiredCount++;
+ }
+ Thread.sleep(1);
+ }
+ long expectedAcquiredCount = (long) (TimeUnit.SECONDS.toSeconds(10) *
permitsPerSecond);
+
+ Assertions.assertTrue(expectedAcquiredCount >= actualAcquiredCount);
+ }
+
+ @Test
+ public void testNoAcquire() throws InterruptedException {
+ double permitsPerSecond = 0.5;
+ Duration delayThreshold = Duration.ofMillis(1000);
+ MessageDelayedEventLimiter delayedEventLimiter =
+ new MessageDelayedEventLimiter(delayThreshold,
permitsPerSecond);
+
+ long endTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(10);
+ long actualAcquiredCount = 0;
+ while (System.currentTimeMillis() < endTime) {
+ boolean acquired =
delayedEventLimiter.acquire(System.currentTimeMillis());
+ if (acquired) {
+ actualAcquiredCount++;
+ }
+ Thread.sleep(1);
+ }
+
+ Assertions.assertTrue(actualAcquiredCount == 0);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index ef83f83257..ca55adbe89 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -250,6 +250,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| s.contains("Java2D Disposer")
|| s.contains("OkHttp ConnectionPool")
|| s.startsWith("http-report-event-scheduler")
+ || s.startsWith("event-forwarder")
|| s.contains(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
|| s.startsWith("Log4j2-TF-")
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 197833903d..049c9c374a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -28,6 +30,7 @@ import
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -46,10 +49,12 @@ import
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClie
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.commons.collections4.CollectionUtils;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
@@ -68,6 +73,7 @@ import lombok.SneakyThrows;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -75,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -140,6 +147,9 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private final ServerConnectorPackageClient serverConnectorPackageClient;
+ private final BlockingQueue<Event> eventBuffer;
+ private final ExecutorService eventForwardService;
+
public TaskExecutionService(
ClassLoaderService classLoaderService,
NodeEngineImpl nodeEngine,
@@ -165,6 +175,43 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
serverConnectorPackageClient =
new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
+
+ eventBuffer = new ArrayBlockingQueue<>(2048);
+ eventForwardService =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
+ eventForwardService.submit(
+ () -> {
+ List<Event> events = new ArrayList<>();
+ RetryUtils.RetryMaterial retryMaterial =
+ new RetryUtils.RetryMaterial(2, true, e -> true);
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ events.clear();
+
+ Event first = eventBuffer.take();
+ events.add(first);
+
+ eventBuffer.drainTo(events, 500);
+ JobEventReportOperation operation = new
JobEventReportOperation(events);
+
+ RetryUtils.retryWithException(
+ () ->
+
NodeEngineUtil.sendOperationToMasterNode(
+ nodeEngine,
operation)
+ .join(),
+ retryMaterial);
+
+ logger.fine("Event forward success, events " +
events.size());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.info("Event forward thread interrupted");
+ } catch (Throwable t) {
+ logger.warning(
+ "Event forward failed, discard events " +
events.size(), t);
+ }
+ }
+ });
}
public void start() {
@@ -175,6 +222,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
isRunning = false;
executorService.shutdownNow();
scheduledExecutorService.shutdown();
+ eventForwardService.shutdownNow();
}
public TaskGroupContext getExecutionContext(TaskGroupLocation
taskGroupLocation) {
@@ -619,6 +667,13 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
}
+ public void reportEvent(Event e) {
+ while (!eventBuffer.offer(e)) {
+ eventBuffer.poll();
+ logger.warning("Event buffer is full, discard the oldest event");
+ }
+ }
+
private final class BlockingWorker implements Runnable {
private final TaskTracker tracker;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
index f1e0fe9ac8..fab3201e5e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.event;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.seatunnel.api.event.Event;
@@ -104,7 +105,8 @@ public class JobEventHttpReportHandler implements
EventHandler {
completionStage.toCompletableFuture().join();
}
- private void report() throws IOException {
+ @VisibleForTesting
+ synchronized void report() throws IOException {
long headSequence = ringbuffer.headSequence();
if (headSequence > committedEventIndex) {
log.warn(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
index 4e834b828c..9b252d35c2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventListener.java
@@ -32,7 +32,7 @@ public class JobEventListener implements EventListener {
@Override
public void onEvent(Event event) {
event.setJobId(String.valueOf(taskLocation.getJobId()));
- JobEventReportOperation evenCollectOperation = new
JobEventReportOperation(event);
- taskExecutionContext.sendToMaster(evenCollectOperation).join();
+
+ taskExecutionContext.getTaskExecutionService().reportEvent(event);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
index cc2500f3b3..9e3da06cf0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventReportOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.event;
import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventProcessor;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -33,24 +34,28 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
public class JobEventReportOperation extends Operation implements
IdentifiedDataSerializable {
- private Event event;
+ private List<Event> events;
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- server.getCoordinatorService().getEventProcessor().process(event);
+ EventProcessor processor =
server.getCoordinatorService().getEventProcessor();
+ for (Event event : events) {
+ processor.process(event);
+ }
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut = new
ObjectOutputStream(byteOut)) {
- objectOut.writeObject(event);
+ objectOut.writeObject(events);
objectOut.flush();
out.writeByteArray(byteOut.toByteArray());
}
@@ -60,7 +65,7 @@ public class JobEventReportOperation extends Operation
implements IdentifiedData
protected void readInternal(ObjectDataInput in) throws IOException {
try (ByteArrayInputStream byteIn = new
ByteArrayInputStream(in.readByteArray());
ObjectInputStream objectIn = new ObjectInputStream(byteIn)) {
- event = (Event) objectIn.readObject();
+ events = (List<Event>) objectIn.readObject();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
index e5a978d486..72e0907490 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandlerTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.hazelcast.config.Config;
+import com.hazelcast.config.RingbufferConfig;
+import com.hazelcast.config.RingbufferStoreConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
@@ -38,38 +40,60 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import okio.Buffer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.given;
+@Slf4j
public class JobEventHttpReportHandlerTest {
+ private static final String ringBufferName = "test";
+ private static final int capacity = 1000;
private static HazelcastInstance hazelcast;
private static MockWebServer mockWebServer;
@BeforeAll
public static void before() throws IOException {
- hazelcast = Hazelcast.newHazelcastInstance();
+ Config config = new Config();
+ config.setRingbufferConfigs(
+ Collections.singletonMap(
+ ringBufferName,
+ new RingbufferConfig(ringBufferName)
+ .setCapacity(capacity)
+ .setBackupCount(0)
+ .setAsyncBackupCount(1)
+ .setTimeToLiveSeconds(0)
+ .setRingbufferStoreConfig(
+ new
RingbufferStoreConfig().setEnabled(false))));
+ hazelcast = Hazelcast.newHazelcastInstance(config);
mockWebServer = new MockWebServer();
mockWebServer.start();
- mockWebServer.enqueue(new MockResponse().setResponseCode(200));
+ for (int i = 0; i < capacity; i++) {
+ mockWebServer.enqueue(new MockResponse().setResponseCode(200));
+ }
}
@AfterAll
public static void after() throws IOException {
hazelcast.shutdown();
- mockWebServer.shutdown();
+ try {
+ mockWebServer.shutdown();
+ } catch (Exception e) {
+ log.error("Failed to shutdown mockWebServer", e);
+ }
}
@Test
public void testReportEvent() throws IOException, InterruptedException {
int maxEvents = 1000;
- Ringbuffer ringbuffer = createRingBuffer(maxEvents);
+ Ringbuffer ringbuffer = hazelcast.getRingbuffer(ringBufferName);
JobEventHttpReportHandler handler =
new JobEventHttpReportHandler(
mockWebServer.url("/api").toString(),
Duration.ofSeconds(1), ringbuffer);
@@ -80,6 +104,7 @@ public class JobEventHttpReportHandlerTest {
.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> mockWebServer.getRequestCount(), count -> count >
0);
+ handler.report();
handler.close();
List<TestEvent> events = new ArrayList<>();
@@ -100,21 +125,6 @@ public class JobEventHttpReportHandlerTest {
}
}
- private Ringbuffer createRingBuffer(int capacity) {
- String ringBufferName = "test";
- hazelcast
- .getConfig()
- .addRingBufferConfig(
- new Config()
- .getRingbufferConfig(ringBufferName)
- .setCapacity(capacity)
- .setBackupCount(0)
- .setAsyncBackupCount(1)
- .setTimeToLiveSeconds(0));
- Ringbuffer ringbuffer = hazelcast.getRingbuffer(ringBufferName);
- return ringbuffer;
- }
-
@Getter
@Setter
@NoArgsConstructor