This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b24a022f37 [Fix][Zeta] Adjust the timing of invoking the enumerator 
open method (#9092)
b24a022f37 is described below

commit b24a022f37d9f5105d9ca1d994910934fb463a0e
Author: Jia Fan <[email protected]>
AuthorDate: Mon Apr 14 09:33:11 2025 +0800

    [Fix][Zeta] Adjust the timing of invoking the enumerator open method (#9092)
---
 .../server/task/SourceSplitEnumeratorTask.java     |   4 +-
 .../server/task/SourceSplitEnumeratorTaskTest.java | 105 +++++++++++++++++++++
 .../org.mockito.plugins.MockMaker                  |  18 ++++
 3 files changed, 125 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 8004068ce6..2fb46d5dd0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -200,6 +200,8 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         } else {
             this.enumerator = 
this.source.getSource().createEnumerator(enumeratorContext);
         }
+        enumerator.open();
+        enumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
         restoreComplete.complete(null);
         log.debug("restoreState split enumerator [{}] finished", 
actionStateList);
     }
@@ -311,8 +313,6 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             case READY_START:
                 if (startCalled && readerRegisterComplete) {
                     currState = STARTING;
-                    enumerator.open();
-                    enumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
                 } else {
                     Thread.sleep(100);
                 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
new file mode 100644
index 0000000000..7beeedba53
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SourceSplitEnumeratorTaskTest {
+
+    @Test
+    void testOpenShouldBeforeReaderRegister() throws Exception {
+
+        SeaTunnelSource source = Mockito.mock(SeaTunnelSource.class);
+        SourceSplitEnumerator enumerator = 
Mockito.mock(SourceSplitEnumerator.class);
+        
Mockito.when(source.createEnumerator(Mockito.any())).thenReturn(enumerator);
+
+        AtomicLong openTime = new AtomicLong(0);
+        Mockito.doAnswer(
+                        answer -> {
+                            openTime.set(System.currentTimeMillis());
+                            return null;
+                        })
+                .when(enumerator)
+                .open();
+
+        AtomicLong registerReaderTime = new AtomicLong(0);
+        Mockito.doAnswer(
+                        answer -> {
+                            registerReaderTime.set(System.currentTimeMillis());
+                            return null;
+                        })
+                .when(enumerator)
+                .registerReader(Mockito.anyInt());
+
+        SourceAction action =
+                new SourceAction<>(1, "fake", source, new HashSet<>(), 
Collections.emptySet());
+        SourceSplitEnumeratorTask enumeratorTask =
+                new SourceSplitEnumeratorTask<>(
+                        1, new TaskLocation(new TaskGroupLocation(1, 1, 1), 1, 
1), action);
+
+        TaskExecutionContext context = 
Mockito.mock(TaskExecutionContext.class);
+        InvocationFuture future = Mockito.mock(InvocationFuture.class);
+        
Mockito.when(context.getOrCreateMetricsContext(Mockito.any())).thenReturn(null);
+        Mockito.when(context.sendToMaster(Mockito.any())).thenReturn(future);
+        Mockito.when(future.join()).thenReturn(null);
+        TaskExecutionService taskExecutionService = 
Mockito.mock(TaskExecutionService.class);
+        
Mockito.when(context.getTaskExecutionService()).thenReturn(taskExecutionService);
+
+        enumeratorTask.setTaskExecutionContext(context);
+
+        // re-order the method call to test the open() should be called before 
receivedReader()
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        Thread.sleep(1000);
+                        enumeratorTask.receivedReader(
+                                new TaskLocation(new TaskGroupLocation(1, 1, 
1), 1, 1),
+                                Address.createUnresolvedAddress("localhost", 
5701));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+        enumeratorTask.init();
+        enumeratorTask.restoreState(new ArrayList<>());
+
+        while (openTime.get() == 0 || registerReaderTime.get() == 0) {
+            enumeratorTask.call();
+        }
+
+        Assertions.assertTrue(openTime.get() < registerReaderTime.get());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000..5e48c7d280
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+mock-maker-inline
\ No newline at end of file

Reply via email to