This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2d193887e0 [Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE` 
events (#7984)
2d193887e0 is described below

commit 2d193887e02f77b4be51cc227b8feebfd55caafd
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 19 16:02:44 2024 +0800

    [Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE` events (#7984)
    
    Co-authored-by: hailin0 <[email protected]>
---
 .../sink/multitablesink/MultiTableSinkWriter.java  |  5 ----
 .../FakeSourceToConsoleWithEventReportIT.java      | 29 ++++++++++++++--------
 2 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 4a9dd34dae..89d7e8d13c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -21,7 +21,6 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
-import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -327,10 +326,6 @@ public class MultiTableSinkWriter
                         (identifier, sinkWriter) -> {
                             try {
                                 sinkWriter.close();
-                                sinkWritersContext
-                                        .get(identifier)
-                                        .getEventListener()
-                                        .onEvent(new WriterCloseEvent());
                             } catch (Throwable e) {
                                 if (firstE[0] == null) {
                                     firstE[0] = e;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
index 8e45bbf9de..79a4dbe1f8 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
@@ -44,7 +44,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -109,16 +109,23 @@ public class FakeSourceToConsoleWithEventReportIT extends 
SeaTunnelContainer {
                 arrayNode.elements().forEachRemaining(jsonNode -> 
events.add(jsonNode));
             }
         }
-        Assertions.assertEquals(10, events.size());
-        Set<String> eventTypes =
-                events.stream().map(e -> 
e.get("eventType").asText()).collect(Collectors.toSet());
+        Map<String, Integer> eventMap =
+                events.stream()
+                        .map(e -> e.get("eventType").asText())
+                        .collect(Collectors.groupingBy(e -> e, 
Collectors.summingInt(e -> 1)));
         Assertions.assertTrue(
-                eventTypes.containsAll(
-                        Arrays.asList(
-                                EventType.LIFECYCLE_ENUMERATOR_OPEN.name(),
-                                EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(),
-                                EventType.LIFECYCLE_READER_OPEN.name(),
-                                EventType.LIFECYCLE_READER_CLOSE.name(),
-                                EventType.LIFECYCLE_WRITER_CLOSE.name())));
+                eventMap.keySet()
+                        .containsAll(
+                                Arrays.asList(
+                                        
EventType.LIFECYCLE_ENUMERATOR_OPEN.name(),
+                                        
EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(),
+                                        EventType.LIFECYCLE_READER_OPEN.name(),
+                                        
EventType.LIFECYCLE_READER_CLOSE.name(),
+                                        
EventType.LIFECYCLE_WRITER_CLOSE.name())));
+        Assertions.assertEquals(2, 
eventMap.get(EventType.LIFECYCLE_READER_OPEN.name()));
+        Assertions.assertEquals(1, 
eventMap.get(EventType.LIFECYCLE_ENUMERATOR_OPEN.name()));
+        Assertions.assertEquals(1, 
eventMap.get(EventType.LIFECYCLE_ENUMERATOR_CLOSE.name()));
+        Assertions.assertEquals(2, 
eventMap.get(EventType.LIFECYCLE_READER_CLOSE.name()));
+        Assertions.assertEquals(2, 
eventMap.get(EventType.LIFECYCLE_WRITER_CLOSE.name()));
     }
 }

Reply via email to