This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 919653d83 [Feature][CDC] Support batch processing on multiple-table
shuffle flow (#4116)
919653d83 is described below
commit 919653d83eb07cc3a9d66a737262a624ecf4cf01
Author: hailin0 <[email protected]>
AuthorDate: Tue Feb 14 10:08:04 2023 +0800
[Feature][CDC] Support batch processing on multiple-table shuffle flow
(#4116)
---
.../cdc/mysql/source/MySqlIncrementalSource.java | 2 +
...tionTransformAction.java => ShuffleAction.java} | 20 +-
.../dag/execution/ExecutionPlanGenerator.java | 8 +-
.../server/dag/execution/PipelineGenerator.java | 6 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 26 +--
.../server/serializable/RecordSerializer.java | 89 ++++++++
.../server/serializable/RecordSerializerHook.java | 43 ++++
.../engine/server/serializable/TypeId.java | 22 ++
.../engine/server/task/SeaTunnelTask.java | 12 +-
.../flow/PartitionTransformSinkFlowLifeCycle.java | 65 ------
.../PartitionTransformSourceFlowLifeCycle.java | 82 --------
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 234 +++++++++++++++++++++
.../task/flow/ShuffleSourceFlowLifeCycle.java | 113 ++++++++++
13 files changed, 539 insertions(+), 183 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 97f8dca85..66401b4ec 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -39,9 +39,11 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
import java.time.ZoneId;
+@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> implements SupportParallelism {
static final String IDENTIFIER = "MySQL-CDC";
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
similarity index 68%
rename from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
rename to
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
index 9fe9bd599..8c6654659 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
@@ -25,23 +25,23 @@ import java.net.URL;
import java.util.List;
import java.util.Set;
-public class PartitionTransformAction extends AbstractAction {
+public class ShuffleAction extends AbstractAction {
private final PartitionSeaTunnelTransform partitionTransformation;
- public PartitionTransformAction(long id,
- @NonNull String name,
- @NonNull List<Action> upstreams,
- @NonNull PartitionSeaTunnelTransform
partitionTransformation,
- @NonNull Set<URL> jarUrls) {
+ public ShuffleAction(long id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull PartitionSeaTunnelTransform
partitionTransformation,
+ @NonNull Set<URL> jarUrls) {
super(id, name, upstreams, jarUrls);
this.partitionTransformation = partitionTransformation;
}
- public PartitionTransformAction(long id,
- @NonNull String name,
- @NonNull PartitionSeaTunnelTransform
partitionTransformation,
- @NonNull Set<URL> jarUrls) {
+ public ShuffleAction(long id,
+ @NonNull String name,
+ @NonNull PartitionSeaTunnelTransform
partitionTransformation,
+ @NonNull Set<URL> jarUrls) {
super(id, name, jarUrls);
this.partitionTransformation = partitionTransformation;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 948a34767..1260b91fc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -22,7 +22,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
@@ -180,10 +180,10 @@ public class ExecutionPlanGenerator {
public static Action recreateAction(Action action, Long id, int
parallelism) {
Action newAction;
- if (action instanceof PartitionTransformAction) {
- newAction = new PartitionTransformAction(id,
+ if (action instanceof ShuffleAction) {
+ newAction = new ShuffleAction(id,
action.getName(),
- ((PartitionTransformAction)
action).getPartitionTransformation(),
+ ((ShuffleAction) action).getPartitionTransformation(),
action.getJarUrls());
} else if (action instanceof SinkAction) {
newAction = new SinkAction<>(id,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index bc5e271f2..d483b9964 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -21,7 +21,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -117,7 +117,7 @@ public class PipelineGenerator {
* If this execution vertex have partition transform, can't be spilt
*/
private boolean checkCanSplit(List<ExecutionEdge> edges) {
- return edges.stream().noneMatch(e -> e.getRightVertex().getAction()
instanceof PartitionTransformAction)
+ return edges.stream().noneMatch(e -> e.getRightVertex().getAction()
instanceof ShuffleAction)
&& edges.stream().anyMatch(e ->
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
}
@@ -158,7 +158,7 @@ public class PipelineGenerator {
private ExecutionVertex recreateVertex(ExecutionVertex vertex, int
parallelism) {
long id = idGenerator.getNextId();
Action action = vertex.getAction();
- return new ExecutionVertex(id,
ExecutionPlanGenerator.recreateAction(action, id, parallelism), action
instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+ return new ExecutionVertex(id,
ExecutionPlanGenerator.recreateAction(action, id, parallelism), action
instanceof ShuffleAction ? vertex.getParallelism() : parallelism);
}
private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 1d6c751f6..e9bbf2a49 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -24,7 +24,7 @@ import
org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -177,7 +177,7 @@ public class PhysicalPlanGenerator {
getSourceTask(edges, sources, pipelineId, totalPipelineNum);
physicalVertexList.addAll(
- getPartitionTask(edges, pipelineId, totalPipelineNum));
+ getShuffleTask(edges, pipelineId, totalPipelineNum));
CompletableFuture<PipelineStatus> pipelineFuture = new
CompletableFuture<>();
waitForCompleteBySubPlanList.add(new
PassiveCompletableFuture<>(pipelineFuture));
@@ -267,11 +267,11 @@ public class PhysicalPlanGenerator {
}).filter(Objects::nonNull).collect(Collectors.toList());
}
- private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
- int pipelineIndex,
- int totalPipelineNum) {
- return edges.stream().filter(s -> s.getLeftVertex().getAction()
instanceof PartitionTransformAction)
- .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+ private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> edges,
+ int pipelineIndex,
+ int totalPipelineNum) {
+ return edges.stream().filter(s -> s.getLeftVertex().getAction()
instanceof ShuffleAction)
+ .map(q -> (ShuffleAction) q.getLeftVertex().getAction())
.map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
.flatMap(flow -> {
List<PhysicalVertex> t = new ArrayList<>();
@@ -292,7 +292,7 @@ public class PhysicalPlanGenerator {
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(taskGroupLocation,
flow.getAction().getName() +
- "-PartitionTransformTask",
+ "-ShuffleTask",
Lists.newArrayList(seaTunnelTask)),
flakeIdGenerator,
pipelineIndex,
@@ -463,11 +463,11 @@ public class PhysicalPlanGenerator {
config.setCommitterTask(committerTaskIDMap.get((SinkAction<?, ?, ?, ?>)
flow.getAction()));
}
flow.setConfig(config);
- } else if (flow.getAction() instanceof PartitionTransformAction) {
+ } else if (flow.getAction() instanceof ShuffleAction) {
PartitionConfig config =
new PartitionConfig(
- ((PartitionTransformAction)
flow.getAction()).getPartitionTransformation().getPartitionCount(),
- ((PartitionTransformAction)
flow.getAction()).getPartitionTransformation().getTargetCount(),
+ ((ShuffleAction)
flow.getAction()).getPartitionTransformation().getPartitionCount(),
+ ((ShuffleAction)
flow.getAction()).getPartitionTransformation().getTargetCount(),
parallelismIndex);
flow.setConfig(config);
}
@@ -529,10 +529,10 @@ public class PhysicalPlanGenerator {
List<Action> actions = edges.stream().filter(e ->
e.getLeftVertex().getAction().equals(start))
.map(e ->
e.getRightVertex().getAction()).collect(Collectors.toList());
List<Flow> wrappers = actions.stream()
- .filter(a -> a instanceof PartitionTransformAction || a instanceof
SinkAction)
+ .filter(a -> a instanceof ShuffleAction || a instanceof SinkAction)
.map(PhysicalExecutionFlow::new).collect(Collectors.toList());
wrappers.addAll(actions.stream()
- .filter(a -> !(a instanceof PartitionTransformAction || a
instanceof SinkAction))
+ .filter(a -> !(a instanceof ShuffleAction || a instanceof
SinkAction))
.map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges,
a))).collect(Collectors.toList()));
return wrappers;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
new file mode 100644
index 000000000..955f5fbc8
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+public class RecordSerializer implements StreamSerializer<Record> {
+ enum RecordDataType {
+ CHECKPOINT_BARRIER,
+ SEATUNNEL_ROW;
+ }
+
+ @Override
+ public void write(ObjectDataOutput out, Record record) throws IOException {
+ Object data = record.getData();
+ if (data instanceof CheckpointBarrier) {
+ CheckpointBarrier checkpointBarrier = (CheckpointBarrier) data;
+ out.writeByte(RecordDataType.CHECKPOINT_BARRIER.ordinal());
+ out.writeLong(checkpointBarrier.getId());
+ out.writeLong(checkpointBarrier.getTimestamp());
+ out.writeString(checkpointBarrier.getCheckpointType().getName());
+ } else if (data instanceof SeaTunnelRow) {
+ SeaTunnelRow row = (SeaTunnelRow) data;
+ out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal());
+ out.writeString(row.getTableId());
+ out.writeByte(row.getRowKind().toByteValue());
+ out.writeByte(row.getArity());
+ for (Object field : row.getFields()) {
+ out.writeObject(field);
+ }
+ } else {
+ throw new UnsupportedEncodingException("Unsupported serialize
class: " + data.getClass());
+ }
+ }
+
+ @Override
+ public Record read(ObjectDataInput in) throws IOException {
+ Object data;
+ byte dataType = in.readByte();
+ if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
+ data = new CheckpointBarrier(in.readLong(), in.readLong(),
CheckpointType.valueOf(in.readString()));
+ } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
+ String tableId = in.readString();
+ byte rowKind = in.readByte();
+ byte arity = in.readByte();
+ SeaTunnelRow row = new SeaTunnelRow(arity);
+ row.setTableId(tableId);
+ row.setRowKind(RowKind.fromByteValue(rowKind));
+ for (int i = 0; i < arity; i++) {
+ row.setField(i, in.readObject());
+ }
+ data = row;
+ } else {
+ throw new UnsupportedEncodingException("Unsupported deserialize
data type: " + dataType);
+ }
+ return new Record(data);
+ }
+
+ @Override
+ public int getTypeId() {
+ return TypeId.RECORD;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
new file mode 100644
index 000000000..a9cccc7b9
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.google.auto.service.AutoService;
+import com.hazelcast.nio.serialization.Serializer;
+import com.hazelcast.nio.serialization.SerializerHook;
+
+@AutoService(SerializerHook.class)
+public class RecordSerializerHook implements SerializerHook<Record> {
+
+ @Override
+ public Class<Record> getSerializationType() {
+ return Record.class;
+ }
+
+ @Override
+ public Serializer createSerializer() {
+ return new RecordSerializer();
+ }
+
+ @Override
+ public boolean isOverwritable() {
+ return true;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
new file mode 100644
index 000000000..ad3808778
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+public class TypeId {
+ static final int RECORD = 1;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5c434acbb..e23dbc871 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -33,7 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -55,8 +55,8 @@ import
org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
-import
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSinkFlowLifeCycle;
-import
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSourceFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
@@ -212,12 +212,12 @@ public abstract class SeaTunnelTask extends AbstractTask {
lifeCycle =
new
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
new SeaTunnelTransformCollector(flowLifeCycles),
completableFuture);
- } else if (f.getAction() instanceof PartitionTransformAction) {
+ } else if (f.getAction() instanceof ShuffleAction) {
// TODO use index and taskID to create ringbuffer list
if (flow.getNext().isEmpty()) {
- lifeCycle = new PartitionTransformSinkFlowLifeCycle(this,
completableFuture);
+ lifeCycle = new ShuffleSinkFlowLifeCycle(this,
completableFuture);
} else {
- lifeCycle = new
PartitionTransformSourceFlowLifeCycle(this, completableFuture);
+ lifeCycle = new ShuffleSourceFlowLifeCycle(this,
completableFuture);
}
} else {
throw new UnknownActionException(f.getAction());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
deleted file mode 100644
index 7d4d7d916..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.ringbuffer.Ringbuffer;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-
-public class PartitionTransformSinkFlowLifeCycle extends AbstractFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>> {
-
- // TODO: init ring buffer
- private Ringbuffer<Record<?>>[] ringbuffers;
- private final Random random = new Random();
-
- public PartitionTransformSinkFlowLifeCycle(SeaTunnelTask runningTask,
CompletableFuture<Void> completableFuture) {
- super(runningTask, completableFuture);
- }
-
- @Override
- public void received(Record<?> row) throws IOException {
- // TODO: No space in the buffer
- if (row.getData() instanceof Barrier) {
- Barrier barrier = (Barrier) row.getData();
- runningTask.ack(barrier);
- if (barrier.prepareClose()) {
- prepareClose = true;
- }
- // The barrier needs to be replicated to all channels
- for (Ringbuffer<Record<?>> ringBuffer : ringbuffers) {
- ringBuffer.add(new Record<>(barrier));
- }
- } else {
- if (prepareClose) {
- return;
- }
- getRingBuffer(row).add(row);
- }
- }
-
- private Ringbuffer<Record<?>> getRingBuffer(Record<?> row) {
- // TODO: choose partition
- return ringbuffers[random.nextInt(ringbuffers.length)];
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
deleted file mode 100644
index d30396f8f..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.ringbuffer.Ringbuffer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-public class PartitionTransformSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle implements OneOutputFlowLifeCycle<T> {
-
- // TODO: init ring buffer
- private Ringbuffer<T>[] ringbuffers;
-
- private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
-
- private long currentCheckpointId = Long.MAX_VALUE;
-
- private int alignedBarriersCounter = 0;
-
- public PartitionTransformSourceFlowLifeCycle(SeaTunnelTask runningTask,
CompletableFuture<Void> completableFuture) {
- super(runningTask, completableFuture);
- }
-
- @Override
- public void collect(Collector<T> collector) throws Exception {
- for (int i = 0; i < ringbuffers.length; i++) {
- Ringbuffer<T> ringbuffer = ringbuffers[i];
- if (ringbuffer.size() <= 0) {
- continue;
- }
- // aligned barrier
- if (alignedBarriers.get(i) != null &&
alignedBarriers.get(i).getId() == currentCheckpointId) {
- continue;
- }
- // Batch reads are not used because of aligned barriers.
- // get the oldest item
- T item = ringbuffer.readOne(ringbuffer.headSequence());
- Record<?> record = (Record<?>) item;
- if (record.getData() instanceof Barrier) {
- Barrier barrier = (Barrier) record.getData();
- alignedBarriers.put(i, barrier);
- alignedBarriersCounter++;
- currentCheckpointId = barrier.getId();
- if (alignedBarriersCounter == ringbuffers.length) {
- runningTask.ack(barrier);
- if (barrier.prepareClose()) {
- prepareClose = true;
- }
- collector.collect(item);
- alignedBarriersCounter = 0;
- }
- } else {
- if (prepareClose) {
- return;
- }
- collector.collect(item);
- }
- }
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
new file mode 100644
index 000000000..ce2eff995
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings("MagicNumber")
+@Slf4j
+public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>> {
+ private Map<String, IQueue<Record<?>>> shuffles;
+ private long shuffleBufferTimesMillis = 1000;
+ private int shuffleBufferSize = 1024;
+ private int shuffleBatchSize = 1024;
+ private Map<String, Queue<Record<?>>> shuffleBuffer;
+ private ShuffleStrategy shuffleStrategy;
+ private long lastModify;
+
+ public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
+ CompletableFuture<Void> completableFuture)
{
+ super(runningTask, completableFuture);
+ // todo initialize shuffleStrategy
+ this.shuffleStrategy = null;
+ this.shuffles = shuffleStrategy.createShuffles();
+ }
+
+ @Override
+ public void received(Record<?> record) throws IOException {
+ if (record.getData() instanceof Barrier) {
+ // flush shuffle buffer
+ shuffleFlush();
+
+ Barrier barrier = (Barrier) record.getData();
+ runningTask.ack(barrier);
+ if (barrier.prepareClose()) {
+ prepareClose = true;
+ }
+ // The barrier needs to be replicated to all channels
+ for (Map.Entry<String, IQueue<Record<?>>> shuffle :
shuffles.entrySet()) {
+ IQueue<Record<?>> shuffleQueue = shuffle.getValue();
+ try {
+ shuffleQueue.put(record);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ } else {
+ if (prepareClose) {
+ return;
+ }
+
+ shuffleItem(record);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ for (Map.Entry<String, IQueue<Record<?>>> shuffleBatch :
shuffles.entrySet()) {
+ shuffleBatch.getValue().destroy();
+ }
+ }
+
+ public CompletableFuture<Boolean>
registryScheduleFlushTask(ScheduledExecutorService scheduledExecutorService) {
+ // todo Register when the job started, Unload at the
end(pause/cancel/crash) of the job
+ CompletableFuture<Boolean> completedFuture = new CompletableFuture();
+ Runnable scheduleFlushTask = new Runnable() {
+ @Override
+ public void run() {
+ if (!prepareClose
+ && shuffleBufferSize > 0
+ && System.currentTimeMillis() - lastModify >
shuffleBufferTimesMillis) {
+
+ try {
+ shuffleFlush();
+ } catch (Exception e) {
+ log.error("Execute schedule task error.", e);
+ }
+ }
+
+ // submit next task
+ if (!prepareClose) {
+ Runnable nextScheduleFlushTask = this;
+ scheduledExecutorService.schedule(nextScheduleFlushTask,
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+ } else {
+ completedFuture.complete(true);
+ }
+ }
+ };
+ scheduledExecutorService.schedule(scheduleFlushTask,
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+ return completedFuture;
+ }
+
+ private synchronized void shuffleItem(Record<?> record) {
+ String shuffleKey = shuffleStrategy.extractShuffleKey(record);
+ shuffleBuffer.compute(shuffleKey, (key, records) -> new LinkedList<>())
+ .add(record);
+ shuffleBufferSize++;
+
+ if (shuffleBufferSize >= shuffleBatchSize
+ || (shuffleBufferSize > 1 && System.currentTimeMillis() -
lastModify > shuffleBufferTimesMillis)) {
+ shuffleFlush();
+ }
+
+ lastModify = System.currentTimeMillis();
+ }
+
+ private synchronized void shuffleFlush() {
+ for (Map.Entry<String, Queue<Record<?>>> shuffleBatch :
shuffleBuffer.entrySet()) {
+ IQueue<Record<?>> shuffleQueue =
shuffleQueue(shuffleBatch.getKey());
+ Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
+ if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
+ for (; ;) {
+ Record<?> shuffleItem = shuffleQueueBatch.poll();
+ if (shuffleItem == null) {
+ break;
+ }
+ try {
+ shuffleQueue.put(shuffleItem);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ shuffleQueueBatch.clear();
+ }
+ shuffleBufferSize = 0;
+ }
+
+ private IQueue<Record<?>> shuffleQueue(String shuffleKey) {
+ return shuffles.get(shuffleKey);
+ }
+
+ interface ShuffleStrategy {
+ Map<String, IQueue<Record<?>>> createShuffles();
+
+ String extractShuffleKey(Record<?> record);
+ }
+
+ @RequiredArgsConstructor
+ @AllArgsConstructor
+ public static class PartitionShuffle implements ShuffleStrategy {
+ private final HazelcastInstance hazelcast;
+ private final String jobId;
+ private final int partitionNumber;
+ private QueueConfig queueConfig;
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles() {
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (int i = 0; i < partitionNumber; i++) {
+ String queueName = String.format("PartitionShuffle[%s-%s]",
jobId, i);
+ QueueConfig queueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ queueConfig.setMaxSize(queueConfig.getMaxSize())
+ .setBackupCount(queueConfig.getBackupCount())
+ .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+ .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+ shuffleMap.put(String.valueOf(i),
hazelcast.getQueue(queueName));
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String extractShuffleKey(Record<?> record) {
+ return RandomStringUtils.random(partitionNumber);
+ }
+ }
+
+ @RequiredArgsConstructor
+ @AllArgsConstructor
+ public static class MultipleRowShuffle implements ShuffleStrategy {
+ private final HazelcastInstance hazelcast;
+ private final String jobId;
+ private final int parallelismIndex;
+ private final MultipleRowType multipleRowType;
+ private QueueConfig queueConfig;
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles() {
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (Map.Entry<String, SeaTunnelRowType> entry : multipleRowType) {
+ String queueName =
String.format("MultipleRowShuffle[%s-%s-%s]", jobId, parallelismIndex,
entry.getKey());
+ QueueConfig queueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ queueConfig.setMaxSize(queueConfig.getMaxSize())
+ .setBackupCount(queueConfig.getBackupCount())
+ .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+ .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+ shuffleMap.put(entry.getKey(), hazelcast.getQueue(queueName));
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String extractShuffleKey(Record<?> record) {
+ return ((SeaTunnelRow) record.getData()).getTableId();
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
new file mode 100644
index 000000000..c95f421df
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.hazelcast.collection.IQueue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@SuppressWarnings("MagicNumber")
+public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
implements OneOutputFlowLifeCycle<Record<?>> {
+ private int shuffleBatchSize = 1024;
+ private IQueue<Record<?>>[] shuffles;
+ private List<Record<?>> unsentBuffer;
+ private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
+ private long currentCheckpointId = Long.MAX_VALUE;
+ private int alignedBarriersCounter = 0;
+
+ public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
+ CompletableFuture<Void>
completableFuture) {
+ super(runningTask, completableFuture);
+ // todo initialize shuffles
+ }
+
+ @Override
+ public void collect(Collector<Record<?>> collector) throws Exception {
+ for (int i = 0; i < shuffles.length; i++) {
+ IQueue<Record<?>> shuffleQueue = shuffles[i];
+ if (shuffleQueue.size() == 0) {
+ continue;
+ }
+ // aligned barrier
+ if (alignedBarriers.get(i) != null &&
alignedBarriers.get(i).getId() == currentCheckpointId) {
+ continue;
+ }
+
+ List<Record<?>> shuffleBatch = new LinkedList<>();
+ if (alignedBarriersCounter > 0) {
+ shuffleBatch.add(shuffleQueue.take());
+ } else if (unsentBuffer != null && !unsentBuffer.isEmpty()) {
+ shuffleBatch = unsentBuffer;
+ unsentBuffer = null;
+ } else if (shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize) ==
0) {
+ shuffleBatch.add(shuffleQueue.take());
+ }
+
+ for (int recordIndex = 0; recordIndex < shuffleBatch.size();
recordIndex++) {
+ Record<?> record = shuffleBatch.get(recordIndex);
+ if (record.getData() instanceof Barrier) {
+ Barrier barrier = (Barrier) record.getData();
+
+ // mark queue barrier
+ alignedBarriers.put(i, barrier);
+ alignedBarriersCounter++;
+ currentCheckpointId = barrier.getId();
+
+ // publish barrier
+ if (alignedBarriersCounter == shuffles.length) {
+ runningTask.ack(barrier);
+ if (barrier.prepareClose()) {
+ prepareClose = true;
+ }
+ collector.collect(record);
+ alignedBarriersCounter = 0;
+ alignedBarriers.clear();
+ }
+
+ if (recordIndex + 1 < shuffleBatch.size()) {
+ unsentBuffer = new
LinkedList<>(shuffleBatch.subList(recordIndex + 1, shuffleBatch.size()));
+ }
+ break;
+ } else {
+ if (prepareClose) {
+ return;
+ }
+ collector.collect(record);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ for (IQueue<Record<?>> shuffleQueue : shuffles) {
+ shuffleQueue.destroy();
+ }
+ }
+}