This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 919653d83 [Feature][CDC] Support batch processing on multiple-table 
shuffle flow (#4116)
919653d83 is described below

commit 919653d83eb07cc3a9d66a737262a624ecf4cf01
Author: hailin0 <[email protected]>
AuthorDate: Tue Feb 14 10:08:04 2023 +0800

    [Feature][CDC] Support batch processing on multiple-table shuffle flow 
(#4116)
---
 .../cdc/mysql/source/MySqlIncrementalSource.java   |   2 +
 ...tionTransformAction.java => ShuffleAction.java} |  20 +-
 .../dag/execution/ExecutionPlanGenerator.java      |   8 +-
 .../server/dag/execution/PipelineGenerator.java    |   6 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |  26 +--
 .../server/serializable/RecordSerializer.java      |  89 ++++++++
 .../server/serializable/RecordSerializerHook.java  |  43 ++++
 .../engine/server/serializable/TypeId.java         |  22 ++
 .../engine/server/task/SeaTunnelTask.java          |  12 +-
 .../flow/PartitionTransformSinkFlowLifeCycle.java  |  65 ------
 .../PartitionTransformSourceFlowLifeCycle.java     |  82 --------
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java | 234 +++++++++++++++++++++
 .../task/flow/ShuffleSourceFlowLifeCycle.java      | 113 ++++++++++
 13 files changed, 539 insertions(+), 183 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 97f8dca85..66401b4ec 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -39,9 +39,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 
 import java.time.ZoneId;
 
+@NoArgsConstructor
 @AutoService(SeaTunnelSource.class)
 public class MySqlIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig> implements SupportParallelism {
     static final String IDENTIFIER = "MySQL-CDC";
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
similarity index 68%
rename from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
rename to 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
index 9fe9bd599..8c6654659 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
@@ -25,23 +25,23 @@ import java.net.URL;
 import java.util.List;
 import java.util.Set;
 
-public class PartitionTransformAction extends AbstractAction {
+public class ShuffleAction extends AbstractAction {
 
     private final PartitionSeaTunnelTransform partitionTransformation;
 
-    public PartitionTransformAction(long id,
-                                    @NonNull String name,
-                                    @NonNull List<Action> upstreams,
-                                    @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
-                                    @NonNull Set<URL> jarUrls) {
+    public ShuffleAction(long id,
+                         @NonNull String name,
+                         @NonNull List<Action> upstreams,
+                         @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
+                         @NonNull Set<URL> jarUrls) {
         super(id, name, upstreams, jarUrls);
         this.partitionTransformation = partitionTransformation;
     }
 
-    public PartitionTransformAction(long id,
-                                    @NonNull String name,
-                                    @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
-                                    @NonNull Set<URL> jarUrls) {
+    public ShuffleAction(long id,
+                         @NonNull String name,
+                         @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
+                         @NonNull Set<URL> jarUrls) {
         super(id, name, jarUrls);
         this.partitionTransformation = partitionTransformation;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 948a34767..1260b91fc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -22,7 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
@@ -180,10 +180,10 @@ public class ExecutionPlanGenerator {
 
     public static Action recreateAction(Action action, Long id, int 
parallelism) {
         Action newAction;
-        if (action instanceof PartitionTransformAction) {
-            newAction = new PartitionTransformAction(id,
+        if (action instanceof ShuffleAction) {
+            newAction = new ShuffleAction(id,
                 action.getName(),
-                ((PartitionTransformAction) 
action).getPartitionTransformation(),
+                ((ShuffleAction) action).getPartitionTransformation(),
                 action.getJarUrls());
         } else if (action instanceof SinkAction) {
             newAction = new SinkAction<>(id,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index bc5e271f2..d483b9964 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -21,7 +21,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -117,7 +117,7 @@ public class PipelineGenerator {
      * If this execution vertex have partition transform, can't be spilt
      */
     private boolean checkCanSplit(List<ExecutionEdge> edges) {
-        return edges.stream().noneMatch(e -> e.getRightVertex().getAction() 
instanceof PartitionTransformAction)
+        return edges.stream().noneMatch(e -> e.getRightVertex().getAction() 
instanceof ShuffleAction)
                 && edges.stream().anyMatch(e -> 
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
     }
 
@@ -158,7 +158,7 @@ public class PipelineGenerator {
     private ExecutionVertex recreateVertex(ExecutionVertex vertex, int 
parallelism) {
         long id = idGenerator.getNextId();
         Action action = vertex.getAction();
-        return new ExecutionVertex(id, 
ExecutionPlanGenerator.recreateAction(action, id, parallelism), action 
instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+        return new ExecutionVertex(id, 
ExecutionPlanGenerator.recreateAction(action, id, parallelism), action 
instanceof ShuffleAction ? vertex.getParallelism() : parallelism);
     }
 
     private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 1d6c751f6..e9bbf2a49 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -24,7 +24,7 @@ import 
org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -177,7 +177,7 @@ public class PhysicalPlanGenerator {
                 getSourceTask(edges, sources, pipelineId, totalPipelineNum);
 
             physicalVertexList.addAll(
-                getPartitionTask(edges, pipelineId, totalPipelineNum));
+                getShuffleTask(edges, pipelineId, totalPipelineNum));
 
             CompletableFuture<PipelineStatus> pipelineFuture = new 
CompletableFuture<>();
             waitForCompleteBySubPlanList.add(new 
PassiveCompletableFuture<>(pipelineFuture));
@@ -267,11 +267,11 @@ public class PhysicalPlanGenerator {
             }).filter(Objects::nonNull).collect(Collectors.toList());
     }
 
-    private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
-                                                  int pipelineIndex,
-                                                  int totalPipelineNum) {
-        return edges.stream().filter(s -> s.getLeftVertex().getAction() 
instanceof PartitionTransformAction)
-            .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+    private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> edges,
+                                                int pipelineIndex,
+                                                int totalPipelineNum) {
+        return edges.stream().filter(s -> s.getLeftVertex().getAction() 
instanceof ShuffleAction)
+            .map(q -> (ShuffleAction) q.getLeftVertex().getAction())
             .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
             .flatMap(flow -> {
                 List<PhysicalVertex> t = new ArrayList<>();
@@ -292,7 +292,7 @@ public class PhysicalPlanGenerator {
                         executorService,
                         flow.getAction().getParallelism(),
                         new TaskGroupDefaultImpl(taskGroupLocation, 
flow.getAction().getName() +
-                            "-PartitionTransformTask",
+                            "-ShuffleTask",
                             Lists.newArrayList(seaTunnelTask)),
                         flakeIdGenerator,
                         pipelineIndex,
@@ -463,11 +463,11 @@ public class PhysicalPlanGenerator {
                     
config.setCommitterTask(committerTaskIDMap.get((SinkAction<?, ?, ?, ?>) 
flow.getAction()));
                 }
                 flow.setConfig(config);
-            } else if (flow.getAction() instanceof PartitionTransformAction) {
+            } else if (flow.getAction() instanceof ShuffleAction) {
                 PartitionConfig config =
                     new PartitionConfig(
-                        ((PartitionTransformAction) 
flow.getAction()).getPartitionTransformation().getPartitionCount(),
-                        ((PartitionTransformAction) 
flow.getAction()).getPartitionTransformation().getTargetCount(),
+                        ((ShuffleAction) 
flow.getAction()).getPartitionTransformation().getPartitionCount(),
+                        ((ShuffleAction) 
flow.getAction()).getPartitionTransformation().getTargetCount(),
                         parallelismIndex);
                 flow.setConfig(config);
             }
@@ -529,10 +529,10 @@ public class PhysicalPlanGenerator {
         List<Action> actions = edges.stream().filter(e -> 
e.getLeftVertex().getAction().equals(start))
             .map(e -> 
e.getRightVertex().getAction()).collect(Collectors.toList());
         List<Flow> wrappers = actions.stream()
-            .filter(a -> a instanceof PartitionTransformAction || a instanceof 
SinkAction)
+            .filter(a -> a instanceof ShuffleAction || a instanceof SinkAction)
             .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
         wrappers.addAll(actions.stream()
-            .filter(a -> !(a instanceof PartitionTransformAction || a 
instanceof SinkAction))
+            .filter(a -> !(a instanceof ShuffleAction || a instanceof 
SinkAction))
             .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, 
a))).collect(Collectors.toList()));
         return wrappers;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
new file mode 100644
index 000000000..955f5fbc8
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+public class RecordSerializer implements StreamSerializer<Record> {
+    enum RecordDataType {
+        CHECKPOINT_BARRIER,
+        SEATUNNEL_ROW;
+    }
+
+    @Override
+    public void write(ObjectDataOutput out, Record record) throws IOException {
+        Object data = record.getData();
+        if (data instanceof CheckpointBarrier) {
+            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) data;
+            out.writeByte(RecordDataType.CHECKPOINT_BARRIER.ordinal());
+            out.writeLong(checkpointBarrier.getId());
+            out.writeLong(checkpointBarrier.getTimestamp());
+            out.writeString(checkpointBarrier.getCheckpointType().getName());
+        } else if (data instanceof SeaTunnelRow) {
+            SeaTunnelRow row = (SeaTunnelRow) data;
+            out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal());
+            out.writeString(row.getTableId());
+            out.writeByte(row.getRowKind().toByteValue());
+            out.writeByte(row.getArity());
+            for (Object field : row.getFields()) {
+                out.writeObject(field);
+            }
+        } else {
+            throw new UnsupportedEncodingException("Unsupported serialize 
class: " + data.getClass());
+        }
+    }
+
+    @Override
+    public Record read(ObjectDataInput in) throws IOException {
+        Object data;
+        byte dataType = in.readByte();
+        if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
+            data = new CheckpointBarrier(in.readLong(), in.readLong(), 
CheckpointType.valueOf(in.readString()));
+        } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
+            String tableId = in.readString();
+            byte rowKind = in.readByte();
+            byte arity = in.readByte();
+            SeaTunnelRow row = new SeaTunnelRow(arity);
+            row.setTableId(tableId);
+            row.setRowKind(RowKind.fromByteValue(rowKind));
+            for (int i = 0; i < arity; i++) {
+                row.setField(i, in.readObject());
+            }
+            data = row;
+        } else {
+            throw new UnsupportedEncodingException("Unsupported deserialize 
data type: " + dataType);
+        }
+        return new Record(data);
+    }
+
+    @Override
+    public int getTypeId() {
+        return TypeId.RECORD;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
new file mode 100644
index 000000000..a9cccc7b9
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.google.auto.service.AutoService;
+import com.hazelcast.nio.serialization.Serializer;
+import com.hazelcast.nio.serialization.SerializerHook;
+
+@AutoService(SerializerHook.class)
+public class RecordSerializerHook implements SerializerHook<Record> {
+
+    @Override
+    public Class<Record> getSerializationType() {
+        return Record.class;
+    }
+
+    @Override
+    public Serializer createSerializer() {
+        return new RecordSerializer();
+    }
+
+    @Override
+    public boolean isOverwritable() {
+        return true;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
new file mode 100644
index 000000000..ad3808778
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TypeId.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+public class TypeId {
+    static final int RECORD = 1;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5c434acbb..e23dbc871 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -33,7 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -55,8 +55,8 @@ import 
org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
 import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
-import 
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSinkFlowLifeCycle;
-import 
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSourceFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
@@ -212,12 +212,12 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 lifeCycle =
                     new 
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
                         new SeaTunnelTransformCollector(flowLifeCycles), 
completableFuture);
-            } else if (f.getAction() instanceof PartitionTransformAction) {
+            } else if (f.getAction() instanceof ShuffleAction) {
                 // TODO use index and taskID to create ringbuffer list
                 if (flow.getNext().isEmpty()) {
-                    lifeCycle = new PartitionTransformSinkFlowLifeCycle(this, 
completableFuture);
+                    lifeCycle = new ShuffleSinkFlowLifeCycle(this, 
completableFuture);
                 } else {
-                    lifeCycle = new 
PartitionTransformSourceFlowLifeCycle(this, completableFuture);
+                    lifeCycle = new ShuffleSourceFlowLifeCycle(this, 
completableFuture);
                 }
             } else {
                 throw new UnknownActionException(f.getAction());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
deleted file mode 100644
index 7d4d7d916..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSinkFlowLifeCycle.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.ringbuffer.Ringbuffer;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-
-public class PartitionTransformSinkFlowLifeCycle extends AbstractFlowLifeCycle 
implements OneInputFlowLifeCycle<Record<?>> {
-
-    // TODO: init ring buffer
-    private Ringbuffer<Record<?>>[] ringbuffers;
-    private final Random random = new Random();
-
-    public PartitionTransformSinkFlowLifeCycle(SeaTunnelTask runningTask, 
CompletableFuture<Void> completableFuture) {
-        super(runningTask, completableFuture);
-    }
-
-    @Override
-    public void received(Record<?> row) throws IOException {
-        // TODO: No space in the buffer
-        if (row.getData() instanceof Barrier) {
-            Barrier barrier = (Barrier) row.getData();
-            runningTask.ack(barrier);
-            if (barrier.prepareClose()) {
-                prepareClose = true;
-            }
-            // The barrier needs to be replicated to all channels
-            for (Ringbuffer<Record<?>> ringBuffer : ringbuffers) {
-                ringBuffer.add(new Record<>(barrier));
-            }
-        } else {
-            if (prepareClose) {
-                return;
-            }
-            getRingBuffer(row).add(row);
-        }
-    }
-
-    private Ringbuffer<Record<?>> getRingBuffer(Record<?> row) {
-        // TODO: choose partition
-        return ringbuffers[random.nextInt(ringbuffers.length)];
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
deleted file mode 100644
index d30396f8f..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.ringbuffer.Ringbuffer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-public class PartitionTransformSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle implements OneOutputFlowLifeCycle<T> {
-
-    // TODO: init ring buffer
-    private Ringbuffer<T>[] ringbuffers;
-
-    private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
-
-    private long currentCheckpointId = Long.MAX_VALUE;
-
-    private int alignedBarriersCounter = 0;
-
-    public PartitionTransformSourceFlowLifeCycle(SeaTunnelTask runningTask, 
CompletableFuture<Void> completableFuture) {
-        super(runningTask, completableFuture);
-    }
-
-    @Override
-    public void collect(Collector<T> collector) throws Exception {
-        for (int i = 0; i < ringbuffers.length; i++) {
-            Ringbuffer<T> ringbuffer = ringbuffers[i];
-            if (ringbuffer.size() <= 0) {
-                continue;
-            }
-            // aligned barrier
-            if (alignedBarriers.get(i) != null && 
alignedBarriers.get(i).getId() == currentCheckpointId) {
-                continue;
-            }
-            // Batch reads are not used because of aligned barriers.
-            // get the oldest item
-            T item = ringbuffer.readOne(ringbuffer.headSequence());
-            Record<?> record = (Record<?>) item;
-            if (record.getData() instanceof Barrier) {
-                Barrier barrier = (Barrier) record.getData();
-                alignedBarriers.put(i, barrier);
-                alignedBarriersCounter++;
-                currentCheckpointId = barrier.getId();
-                if (alignedBarriersCounter == ringbuffers.length) {
-                    runningTask.ack(barrier);
-                    if (barrier.prepareClose()) {
-                        prepareClose = true;
-                    }
-                    collector.collect(item);
-                    alignedBarriersCounter = 0;
-                }
-            } else {
-                if (prepareClose) {
-                    return;
-                }
-                collector.collect(item);
-            }
-        }
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
new file mode 100644
index 000000000..ce2eff995
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings("MagicNumber")
+@Slf4j
+public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>> {
+    private Map<String, IQueue<Record<?>>> shuffles;
+    private long shuffleBufferTimesMillis = 1000;
+    private int shuffleBufferSize = 1024;
+    private int shuffleBatchSize = 1024;
+    private Map<String, Queue<Record<?>>> shuffleBuffer;
+    private ShuffleStrategy shuffleStrategy;
+    private long lastModify;
+
+    public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
+                                    CompletableFuture<Void> completableFuture) 
{
+        super(runningTask, completableFuture);
+        // todo initialize shuffleStrategy
+        this.shuffleStrategy = null;
+        this.shuffles = shuffleStrategy.createShuffles();
+    }
+
+    @Override
+    public void received(Record<?> record) throws IOException {
+        if (record.getData() instanceof Barrier) {
+            // flush shuffle buffer
+            shuffleFlush();
+
+            Barrier barrier = (Barrier) record.getData();
+            runningTask.ack(barrier);
+            if (barrier.prepareClose()) {
+                prepareClose = true;
+            }
+            // The barrier needs to be replicated to all channels
+            for (Map.Entry<String, IQueue<Record<?>>> shuffle : 
shuffles.entrySet()) {
+                IQueue<Record<?>> shuffleQueue = shuffle.getValue();
+                try {
+                    shuffleQueue.put(record);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        } else {
+            if (prepareClose) {
+                return;
+            }
+
+            shuffleItem(record);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        for (Map.Entry<String, IQueue<Record<?>>> shuffleBatch : 
shuffles.entrySet()) {
+            shuffleBatch.getValue().destroy();
+        }
+    }
+
+    public CompletableFuture<Boolean> 
registryScheduleFlushTask(ScheduledExecutorService scheduledExecutorService) {
+        // todo Register when the job started, Unload at the 
end(pause/cancel/crash) of the job
+        CompletableFuture<Boolean> completedFuture = new CompletableFuture();
+        Runnable scheduleFlushTask = new Runnable() {
+            @Override
+            public void run() {
+                if (!prepareClose
+                    && shuffleBufferSize > 0
+                    && System.currentTimeMillis() - lastModify > 
shuffleBufferTimesMillis) {
+
+                    try {
+                        shuffleFlush();
+                    } catch (Exception e) {
+                        log.error("Execute schedule task error.", e);
+                    }
+                }
+
+                // submit next task
+                if (!prepareClose) {
+                    Runnable nextScheduleFlushTask = this;
+                    scheduledExecutorService.schedule(nextScheduleFlushTask, 
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+                } else {
+                    completedFuture.complete(true);
+                }
+            }
+        };
+        scheduledExecutorService.schedule(scheduleFlushTask, 
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+        return completedFuture;
+    }
+
+    private synchronized void shuffleItem(Record<?> record) {
+        String shuffleKey = shuffleStrategy.extractShuffleKey(record);
+        shuffleBuffer.compute(shuffleKey, (key, records) -> new LinkedList<>())
+            .add(record);
+        shuffleBufferSize++;
+
+        if (shuffleBufferSize >= shuffleBatchSize
+            || (shuffleBufferSize > 1 && System.currentTimeMillis() - 
lastModify > shuffleBufferTimesMillis)) {
+            shuffleFlush();
+        }
+
+        lastModify = System.currentTimeMillis();
+    }
+
+    private synchronized void shuffleFlush() {
+        for (Map.Entry<String, Queue<Record<?>>> shuffleBatch : 
shuffleBuffer.entrySet()) {
+            IQueue<Record<?>> shuffleQueue = 
shuffleQueue(shuffleBatch.getKey());
+            Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
+            if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
+                for (; ;) {
+                    Record<?> shuffleItem = shuffleQueueBatch.poll();
+                    if (shuffleItem == null) {
+                        break;
+                    }
+                    try {
+                        shuffleQueue.put(shuffleItem);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            shuffleQueueBatch.clear();
+        }
+        shuffleBufferSize = 0;
+    }
+
+    private IQueue<Record<?>> shuffleQueue(String shuffleKey) {
+        return shuffles.get(shuffleKey);
+    }
+
+    interface ShuffleStrategy {
+        Map<String, IQueue<Record<?>>> createShuffles();
+
+        String extractShuffleKey(Record<?> record);
+    }
+
+    @RequiredArgsConstructor
+    @AllArgsConstructor
+    public static class PartitionShuffle implements ShuffleStrategy {
+        private final HazelcastInstance hazelcast;
+        private final String jobId;
+        private final int partitionNumber;
+        private QueueConfig queueConfig;
+
+        @Override
+        public Map<String, IQueue<Record<?>>> createShuffles() {
+            Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+            for (int i = 0; i < partitionNumber; i++) {
+                String queueName = String.format("PartitionShuffle[%s-%s]", 
jobId, i);
+                QueueConfig queueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
+                queueConfig.setMaxSize(queueConfig.getMaxSize())
+                    .setBackupCount(queueConfig.getBackupCount())
+                    .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+                    .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+                shuffleMap.put(String.valueOf(i), 
hazelcast.getQueue(queueName));
+            }
+            return shuffleMap;
+        }
+
+        @Override
+        public String extractShuffleKey(Record<?> record) {
+            return RandomStringUtils.random(partitionNumber);
+        }
+    }
+
+    @RequiredArgsConstructor
+    @AllArgsConstructor
+    public static class MultipleRowShuffle implements ShuffleStrategy {
+        private final HazelcastInstance hazelcast;
+        private final String jobId;
+        private final int parallelismIndex;
+        private final MultipleRowType multipleRowType;
+        private QueueConfig queueConfig;
+
+        @Override
+        public Map<String, IQueue<Record<?>>> createShuffles() {
+            Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+            for (Map.Entry<String, SeaTunnelRowType> entry : multipleRowType) {
+                String queueName = 
String.format("MultipleRowShuffle[%s-%s-%s]", jobId, parallelismIndex, 
entry.getKey());
+                QueueConfig queueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
+                queueConfig.setMaxSize(queueConfig.getMaxSize())
+                    .setBackupCount(queueConfig.getBackupCount())
+                    .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+                    .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+                shuffleMap.put(entry.getKey(), hazelcast.getQueue(queueName));
+            }
+            return shuffleMap;
+        }
+
+        @Override
+        public String extractShuffleKey(Record<?> record) {
+            return ((SeaTunnelRow) record.getData()).getTableId();
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
new file mode 100644
index 000000000..c95f421df
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.flow;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.hazelcast.collection.IQueue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@SuppressWarnings("MagicNumber")
+public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle 
implements OneOutputFlowLifeCycle<Record<?>> {
+    private int shuffleBatchSize = 1024;
+    private IQueue<Record<?>>[] shuffles;
+    private List<Record<?>> unsentBuffer;
+    private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
+    private long currentCheckpointId = Long.MAX_VALUE;
+    private int alignedBarriersCounter = 0;
+
+    public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
+                                      CompletableFuture<Void> 
completableFuture) {
+        super(runningTask, completableFuture);
+        // todo initialize shuffles
+    }
+
+    @Override
+    public void collect(Collector<Record<?>> collector) throws Exception {
+        for (int i = 0; i < shuffles.length; i++) {
+            IQueue<Record<?>> shuffleQueue = shuffles[i];
+            if (shuffleQueue.size() == 0) {
+                continue;
+            }
+            // aligned barrier
+            if (alignedBarriers.get(i) != null && 
alignedBarriers.get(i).getId() == currentCheckpointId) {
+                continue;
+            }
+
+            List<Record<?>> shuffleBatch = new LinkedList<>();
+            if (alignedBarriersCounter > 0) {
+                shuffleBatch.add(shuffleQueue.take());
+            } else if (unsentBuffer != null && !unsentBuffer.isEmpty()) {
+                shuffleBatch = unsentBuffer;
+                unsentBuffer = null;
+            } else if (shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize) == 
0) {
+                shuffleBatch.add(shuffleQueue.take());
+            }
+
+            for (int recordIndex = 0; recordIndex < shuffleBatch.size(); 
recordIndex++) {
+                Record<?> record = shuffleBatch.get(recordIndex);
+                if (record.getData() instanceof Barrier) {
+                    Barrier barrier = (Barrier) record.getData();
+
+                    // mark queue barrier
+                    alignedBarriers.put(i, barrier);
+                    alignedBarriersCounter++;
+                    currentCheckpointId = barrier.getId();
+
+                    // publish barrier
+                    if (alignedBarriersCounter == shuffles.length) {
+                        runningTask.ack(barrier);
+                        if (barrier.prepareClose()) {
+                            prepareClose = true;
+                        }
+                        collector.collect(record);
+                        alignedBarriersCounter = 0;
+                        alignedBarriers.clear();
+                    }
+
+                    if (recordIndex + 1 < shuffleBatch.size()) {
+                        unsentBuffer = new 
LinkedList<>(shuffleBatch.subList(recordIndex + 1, shuffleBatch.size()));
+                    }
+                    break;
+                } else {
+                    if (prepareClose) {
+                        return;
+                    }
+                    collector.collect(record);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        for (IQueue<Record<?>> shuffleQueue : shuffles) {
+            shuffleQueue.destroy();
+        }
+    }
+}


Reply via email to