This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2d193887e0 [Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE`
events (#7984)
2d193887e0 is described below
commit 2d193887e02f77b4be51cc227b8feebfd55caafd
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 19 16:02:44 2024 +0800
[Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE` events (#7984)
Co-authored-by: hailin0 <[email protected]>
---
.../sink/multitablesink/MultiTableSinkWriter.java | 5 ----
.../FakeSourceToConsoleWithEventReportIT.java | 29 ++++++++++++++--------
2 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 4a9dd34dae..89d7e8d13c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -21,7 +21,6 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
-import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -327,10 +326,6 @@ public class MultiTableSinkWriter
(identifier, sinkWriter) -> {
try {
sinkWriter.close();
- sinkWritersContext
- .get(identifier)
- .getEventListener()
- .onEvent(new WriterCloseEvent());
} catch (Throwable e) {
if (firstE[0] == null) {
firstE[0] = e;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
index 8e45bbf9de..79a4dbe1f8 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
@@ -44,7 +44,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -109,16 +109,23 @@ public class FakeSourceToConsoleWithEventReportIT extends
SeaTunnelContainer {
arrayNode.elements().forEachRemaining(jsonNode ->
events.add(jsonNode));
}
}
- Assertions.assertEquals(10, events.size());
- Set<String> eventTypes =
- events.stream().map(e ->
e.get("eventType").asText()).collect(Collectors.toSet());
+ Map<String, Integer> eventMap =
+ events.stream()
+ .map(e -> e.get("eventType").asText())
+ .collect(Collectors.groupingBy(e -> e,
Collectors.summingInt(e -> 1)));
Assertions.assertTrue(
- eventTypes.containsAll(
- Arrays.asList(
- EventType.LIFECYCLE_ENUMERATOR_OPEN.name(),
- EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(),
- EventType.LIFECYCLE_READER_OPEN.name(),
- EventType.LIFECYCLE_READER_CLOSE.name(),
- EventType.LIFECYCLE_WRITER_CLOSE.name())));
+ eventMap.keySet()
+ .containsAll(
+ Arrays.asList(
+
EventType.LIFECYCLE_ENUMERATOR_OPEN.name(),
+
EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(),
+ EventType.LIFECYCLE_READER_OPEN.name(),
+
EventType.LIFECYCLE_READER_CLOSE.name(),
+
EventType.LIFECYCLE_WRITER_CLOSE.name())));
+ Assertions.assertEquals(2,
eventMap.get(EventType.LIFECYCLE_READER_OPEN.name()));
+ Assertions.assertEquals(1,
eventMap.get(EventType.LIFECYCLE_ENUMERATOR_OPEN.name()));
+ Assertions.assertEquals(1,
eventMap.get(EventType.LIFECYCLE_ENUMERATOR_CLOSE.name()));
+ Assertions.assertEquals(2,
eventMap.get(EventType.LIFECYCLE_READER_CLOSE.name()));
+ Assertions.assertEquals(2,
eventMap.get(EventType.LIFECYCLE_WRITER_CLOSE.name()));
}
}