This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e410fa81e [feature][engine] Support for handling source reader events 
(#3665)
e410fa81e is described below

commit e410fa81ee4e0be9ad7909e964ffd9e541b3da93
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Dec 7 21:23:42 2022 +0800

    [feature][engine] Support for handling source reader events (#3665)
---
 .../api/configuration/ReadonlyConfig.java          |  5 +-
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../server/task/SourceSplitEnumeratorTask.java     |  9 ++-
 .../server/task/context/SourceReaderContext.java   |  2 +-
 .../server/task/flow/SourceFlowLifeCycle.java      | 13 +++++
 .../operation/source/RequestSplitOperation.java    |  2 +-
 .../operation/source/SourceEventOperation.java     | 64 ++++++++++++++++++++
 ...ration.java => SourceReaderEventOperation.java} | 68 ++++++----------------
 8 files changed, 113 insertions(+), 55 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index e82fbdeac..4f733e88e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -29,13 +29,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-public class ReadonlyConfig {
-
+public class ReadonlyConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
     private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index da7eb035d..c1586f2b7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNo
 import 
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -83,6 +84,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int CLEAN_TASKGROUP_CONTEXT_OPERATION = 19;
 
+    public static final int SOURCE_READER_EVENT_OPERATOR = 20;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -141,6 +144,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new GetTaskGroupMetricsOperation();
                 case CLEAN_TASKGROUP_CONTEXT_OPERATION:
                     return new CleanTaskGroupContextOperation();
+                case SOURCE_READER_EVENT_OPERATOR:
+                    return new SourceReaderEventOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index d616cc273..bd0f2dcff 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -27,6 +27,7 @@ import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
 
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -174,8 +175,12 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         }
     }
 
-    public void requestSplit(long taskID) {
-        enumerator.handleSplitRequest((int) taskID);
+    public void requestSplit(long taskIndex) {
+        enumerator.handleSplitRequest((int) taskIndex);
+    }
+
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        enumerator.handleSourceEvent(subtaskId, sourceEvent);
     }
 
     public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder) 
{
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
index ef9678080..47830ef26 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
@@ -59,6 +59,6 @@ public class SourceReaderContext implements 
SourceReader.Context {
 
     @Override
     public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
-
+        sourceActionLifeCycle.sendSourceEventToEnumerator(sourceEvent);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b14fb7e57..826e508c2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -21,6 +21,7 @@ import static 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.Record;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOper
 import 
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
@@ -146,6 +148,17 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
+        try {
+            runningTask.getExecutionContext().sendToMember(
+                new SourceReaderEventOperation(enumeratorTaskLocation, 
currentTaskLocation, sourceEvent),
+                enumeratorTaskAddress).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOGGER.warning("source request split failed", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     public void receivedSplits(List<SplitT> splits) {
         if (splits.isEmpty()) {
             reader.handleNoMoreSplits();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index c540ec528..dcad06c96 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,7 +51,7 @@ public class RequestSplitOperation extends Operation 
implements IdentifiedDataSe
 
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task = 
server.getTaskExecutionService().getTask(enumeratorTaskID);
-            task.requestSplit(taskID.getTaskID());
+            task.requestSplit(taskID.getTaskIndex());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
             exception -> exception instanceof NullPointerException &&
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
new file mode 100644
index 000000000..e62865993
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
+public abstract class SourceEventOperation extends TaskOperation {
+    protected TaskLocation currentTaskLocation;
+
+    protected SourceEvent sourceEvent;
+
+    public SourceEventOperation() {
+    }
+
+    public SourceEventOperation(TaskLocation targetTaskLocation,
+                                TaskLocation currentTaskLocation,
+                                SourceEvent event) {
+        super(targetTaskLocation);
+        this.currentTaskLocation = currentTaskLocation;
+        this.sourceEvent = event;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(currentTaskLocation);
+        out.writeObject(sourceEvent);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        currentTaskLocation = in.readObject();
+        sourceEvent = in.readObject();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
similarity index 51%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index c540ec528..97e873220 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.operation.source;
 
+import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -24,66 +25,35 @@ import 
org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import com.hazelcast.spi.impl.operationservice.Operation;
-
-import java.io.IOException;
-
-public class RequestSplitOperation extends Operation implements 
IdentifiedDataSerializable {
-
-    private TaskLocation enumeratorTaskID;
-
-    private TaskLocation taskID;
+/**
+ * For {@link org.apache.seatunnel.api.source.SourceReader} send event to
+ * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ */
+public class SourceReaderEventOperation extends SourceEventOperation {
+    public SourceReaderEventOperation() {
+    }
 
-    public RequestSplitOperation() {
+    public SourceReaderEventOperation(TaskLocation targetTaskLocation,
+                                      TaskLocation currentTaskLocation,
+                                      SourceEvent event) {
+        super(targetTaskLocation, currentTaskLocation, event);
     }
 
-    public RequestSplitOperation(TaskLocation taskID, TaskLocation 
enumeratorTaskID) {
-        this.enumeratorTaskID = enumeratorTaskID;
-        this.taskID = taskID;
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.SOURCE_READER_EVENT_OPERATOR;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-
         RetryUtils.retryWithException(() -> {
-            SourceSplitEnumeratorTask<?> task = 
server.getTaskExecutionService().getTask(enumeratorTaskID);
-            task.requestSplit(taskID.getTaskID());
+            SourceSplitEnumeratorTask<?> task =
+                server.getTaskExecutionService().getTask(taskLocation);
+            task.handleSourceEvent(currentTaskLocation.getTaskIndex(), 
sourceEvent);
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
             exception -> exception instanceof NullPointerException &&
-                !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
-    }
-
-    @Override
-    public String getServiceName() {
-        return SeaTunnelServer.SERVICE_NAME;
-    }
-
-    @Override
-    protected void writeInternal(ObjectDataOutput out) throws IOException {
-        super.writeInternal(out);
-        out.writeObject(taskID);
-        out.writeObject(enumeratorTaskID);
-    }
-
-    @Override
-    protected void readInternal(ObjectDataInput in) throws IOException {
-        super.readInternal(in);
-        taskID = in.readObject();
-        enumeratorTaskID = in.readObject();
-    }
-
-    @Override
-    public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
-    }
-
-    @Override
-    public int getClassId() {
-        return TaskDataSerializerHook.REQUEST_SPLIT_TYPE;
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 }

Reply via email to