This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e410fa81e [feature][engine] Support for handling source reader events
(#3665)
e410fa81e is described below
commit e410fa81ee4e0be9ad7909e964ffd9e541b3da93
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Dec 7 21:23:42 2022 +0800
[feature][engine] Support for handling source reader events (#3665)
---
.../api/configuration/ReadonlyConfig.java | 5 +-
.../serializable/TaskDataSerializerHook.java | 5 ++
.../server/task/SourceSplitEnumeratorTask.java | 9 ++-
.../server/task/context/SourceReaderContext.java | 2 +-
.../server/task/flow/SourceFlowLifeCycle.java | 13 +++++
.../operation/source/RequestSplitOperation.java | 2 +-
.../operation/source/SourceEventOperation.java | 64 ++++++++++++++++++++
...ration.java => SourceReaderEventOperation.java} | 68 ++++++----------------
8 files changed, 113 insertions(+), 55 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index e82fbdeac..4f733e88e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -29,13 +29,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-public class ReadonlyConfig {
-
+public class ReadonlyConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
/**
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index da7eb035d..c1586f2b7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNo
import
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -83,6 +84,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int CLEAN_TASKGROUP_CONTEXT_OPERATION = 19;
+ public static final int SOURCE_READER_EVENT_OPERATOR = 20;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -141,6 +144,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new GetTaskGroupMetricsOperation();
case CLEAN_TASKGROUP_CONTEXT_OPERATION:
return new CleanTaskGroupContextOperation();
+ case SOURCE_READER_EVENT_OPERATOR:
+ return new SourceReaderEventOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index d616cc273..bd0f2dcff 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -27,6 +27,7 @@ import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -174,8 +175,12 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
}
}
- public void requestSplit(long taskID) {
- enumerator.handleSplitRequest((int) taskID);
+ public void requestSplit(long taskIndex) {
+ enumerator.handleSplitRequest((int) taskIndex);
+ }
+
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ enumerator.handleSourceEvent(subtaskId, sourceEvent);
}
public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder)
{
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
index ef9678080..47830ef26 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
@@ -59,6 +59,6 @@ public class SourceReaderContext implements
SourceReader.Context {
@Override
public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
-
+ sourceActionLifeCycle.sendSourceEventToEnumerator(sourceEvent);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b14fb7e57..826e508c2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -21,6 +21,7 @@ import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.Record;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOper
import
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -146,6 +148,17 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
+ try {
+ runningTask.getExecutionContext().sendToMember(
+ new SourceReaderEventOperation(enumeratorTaskLocation,
currentTaskLocation, sourceEvent),
+ enumeratorTaskAddress).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.warning("source request split failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
public void receivedSplits(List<SplitT> splits) {
if (splits.isEmpty()) {
reader.handleNoMoreSplits();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index c540ec528..dcad06c96 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,7 +51,7 @@ public class RequestSplitOperation extends Operation
implements IdentifiedDataSe
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
- task.requestSplit(taskID.getTaskID());
+ task.requestSplit(taskID.getTaskIndex());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof NullPointerException &&
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
new file mode 100644
index 000000000..e62865993
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
+public abstract class SourceEventOperation extends TaskOperation {
+ protected TaskLocation currentTaskLocation;
+
+ protected SourceEvent sourceEvent;
+
+ public SourceEventOperation() {
+ }
+
+ public SourceEventOperation(TaskLocation targetTaskLocation,
+ TaskLocation currentTaskLocation,
+ SourceEvent event) {
+ super(targetTaskLocation);
+ this.currentTaskLocation = currentTaskLocation;
+ this.sourceEvent = event;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(currentTaskLocation);
+ out.writeObject(sourceEvent);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ currentTaskLocation = in.readObject();
+ sourceEvent = in.readObject();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
similarity index 51%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index c540ec528..97e873220 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.operation.source;
+import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -24,66 +25,35 @@ import
org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import com.hazelcast.spi.impl.operationservice.Operation;
-
-import java.io.IOException;
-
-public class RequestSplitOperation extends Operation implements
IdentifiedDataSerializable {
-
- private TaskLocation enumeratorTaskID;
-
- private TaskLocation taskID;
+/**
+ * For {@link org.apache.seatunnel.api.source.SourceReader} send event to
+ * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ */
+public class SourceReaderEventOperation extends SourceEventOperation {
+ public SourceReaderEventOperation() {
+ }
- public RequestSplitOperation() {
+ public SourceReaderEventOperation(TaskLocation targetTaskLocation,
+ TaskLocation currentTaskLocation,
+ SourceEvent event) {
+ super(targetTaskLocation, currentTaskLocation, event);
}
- public RequestSplitOperation(TaskLocation taskID, TaskLocation
enumeratorTaskID) {
- this.enumeratorTaskID = enumeratorTaskID;
- this.taskID = taskID;
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.SOURCE_READER_EVENT_OPERATOR;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
-
RetryUtils.retryWithException(() -> {
- SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
- task.requestSplit(taskID.getTaskID());
+ SourceSplitEnumeratorTask<?> task =
+ server.getTaskExecutionService().getTask(taskLocation);
+ task.handleSourceEvent(currentTaskLocation.getTaskIndex(),
sourceEvent);
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof NullPointerException &&
- !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
- }
-
- @Override
- public String getServiceName() {
- return SeaTunnelServer.SERVICE_NAME;
- }
-
- @Override
- protected void writeInternal(ObjectDataOutput out) throws IOException {
- super.writeInternal(out);
- out.writeObject(taskID);
- out.writeObject(enumeratorTaskID);
- }
-
- @Override
- protected void readInternal(ObjectDataInput in) throws IOException {
- super.readInternal(in);
- taskID = in.readObject();
- enumeratorTaskID = in.readObject();
- }
-
- @Override
- public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
- }
-
- @Override
- public int getClassId() {
- return TaskDataSerializerHook.REQUEST_SPLIT_TYPE;
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}