This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 507ca8561 [Hotfix][Zeta] Fix shuffle checkpoint (#4224)
507ca8561 is described below

commit 507ca85611b655be7f99b8530890a25705da7b31
Author: hailin0 <[email protected]>
AuthorDate: Sat Feb 25 17:02:38 2023 +0800

    [Hotfix][Zeta] Fix shuffle checkpoint (#4224)
---
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java | 31 ++++++---
 .../server/checkpoint/PendingCheckpoint.java       |  2 +-
 .../dag/execution/ExecutionPlanGenerator.java      |  5 +-
 .../server/dag/physical/PhysicalPlanGenerator.java | 80 +++++++++++-----------
 .../engine/server/task/SeaTunnelTask.java          |  6 +-
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java | 19 +++--
 .../task/flow/ShuffleSourceFlowLifeCycle.java      | 19 +++--
 7 files changed, 94 insertions(+), 68 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 2c0087213..98edb185a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
 
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -44,6 +45,7 @@ import java.util.Map;
 /**
  * Deserialization schema from Debezium object to {@link SeaTunnelRow}.
  */
+@Slf4j
 public final class SeaTunnelRowDebeziumDeserializeSchema
     implements DebeziumDeserializationSchema<SeaTunnelRow> {
     private static final long serialVersionUID = 1L;
@@ -56,9 +58,9 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
     /**
      * Runtime converter that converts Kafka {@link SourceRecord}s into {@link 
SeaTunnelRow} consisted of
      */
-    private final SeaTunnelRowDebeziumDeserializationConverters 
singleRowConverter;
+    private final SeaTunnelRowDebeziumDeserializationConverters 
singleTableRowConverter;
 
-    private final Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleRowConverters;
+    private final Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleTableRowConverters;
 
     /**
      * Validator to validate the row value.
@@ -80,28 +82,28 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         ZoneId serverTimeZone,
         DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
 
-        SeaTunnelRowDebeziumDeserializationConverters singleRowConverter = 
null;
-        Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleRowConverters = Collections.emptyMap();
+        SeaTunnelRowDebeziumDeserializationConverters singleTableRowConverter 
= null;
+        Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleTableRowConverters = Collections.emptyMap();
         if (physicalDataType instanceof MultipleRowType) {
-            multipleRowConverters = new HashMap<>();
+            multipleTableRowConverters = new HashMap<>();
             for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) 
physicalDataType) {
                 SeaTunnelRowDebeziumDeserializationConverters itemRowConverter 
= new SeaTunnelRowDebeziumDeserializationConverters(
                     item.getValue(),
                     metadataConverters,
                     serverTimeZone,
                     userDefinedConverterFactory);
-                multipleRowConverters.put(item.getKey(), itemRowConverter);
+                multipleTableRowConverters.put(item.getKey(), 
itemRowConverter);
             }
         } else {
-            singleRowConverter = new 
SeaTunnelRowDebeziumDeserializationConverters(
+            singleTableRowConverter = new 
SeaTunnelRowDebeziumDeserializationConverters(
                 (SeaTunnelRowType) physicalDataType,
                 metadataConverters,
                 serverTimeZone,
                 userDefinedConverterFactory
             );
         }
-        this.singleRowConverter = singleRowConverter;
-        this.multipleRowConverters = multipleRowConverters;
+        this.singleTableRowConverter = singleTableRowConverter;
+        this.multipleTableRowConverters = multipleTableRowConverters;
         this.resultTypeInfo = checkNotNull(resultType);
         this.validator = checkNotNull(validator);
     }
@@ -114,7 +116,16 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
 
         Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
         String tableName = 
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
-        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableName, singleRowConverter);
+        SeaTunnelRowDebeziumDeserializationConverters converters;
+        if (!multipleTableRowConverters.isEmpty()) {
+            converters = multipleTableRowConverters.get(tableName);
+            if (converters == null) {
+                log.debug("Ignore newly added table {}", tableName);
+                return;
+            }
+        } else {
+            converters = singleTableRowConverter;
+        }
 
         if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
             SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 23ad0aa59..013b351b9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -124,7 +124,7 @@ public class PendingCheckpoint implements Checkpoint {
         for (ActionSubtaskState state : states) {
             ActionState actionState = actionStates.get(state.getActionId());
             if (actionState == null) {
-                return;
+                continue;
             }
             stateSize += 
state.getState().stream().filter(Objects::nonNull).map(s -> s.length).count();
             actionState.reportState(state.getIndex(), state);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 8863a70d3..704b35ff1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -201,13 +201,14 @@ public class ExecutionPlanGenerator {
         String shuffleActionName = String.format("Shuffle [%s -> table[0~%s]]",
             sourceAction.getName(), ((MultipleRowType) 
sourceProducedType).getTableIds().length - 1);
         ShuffleAction shuffleAction = new ShuffleAction(shuffleVertexId, 
shuffleActionName, shuffleConfig);
-        // multiple-row shuffle default parallelism is 1
-        shuffleAction.setParallelism(1);
+        shuffleAction.setParallelism(sourceAction.getParallelism());
         ExecutionVertex shuffleVertex = new ExecutionVertex(shuffleVertexId, 
shuffleAction, shuffleAction.getParallelism());
         ExecutionEdge sourceToShuffleEdge = new 
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
         newExecutionEdges.add(sourceToShuffleEdge);
 
         for (ExecutionVertex sinkVertex : sinkVertices) {
+            sinkVertex.setParallelism(1);
+            sinkVertex.getAction().setParallelism(1);
             ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, 
sinkVertex);
             newExecutionEdges.add(shuffleToSinkEdge);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index c55a14e7e..cdc2cac22 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -297,46 +297,46 @@ public class PhysicalPlanGenerator {
 
                         long taskIDPrefix = idGenerator.getNextId();
                         long taskGroupIDPrefix = idGenerator.getNextId();
-                        for (int parallelismIndex = 0; parallelismIndex < 
flow.getAction().getParallelism(); parallelismIndex++) {
-                            ShuffleStrategy shuffleStrategyOfSinkFlow = 
shuffleMultipleRowStrategy.toBuilder()
-                                .targetTableId(sinkTableId)
-                                .build();
-                            ShuffleConfig shuffleConfigOfSinkFlow = 
shuffleConfig.toBuilder()
-                                .shuffleStrategy(shuffleStrategyOfSinkFlow)
-                                .build();
-                            long shuffleActionId = idGenerator.getNextId();
-                            String shuffleActionName = String.format("Shuffle 
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
-                            ShuffleAction shuffleActionOfSinkFlow = new 
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
-                            shuffleActionOfSinkFlow.setParallelism(1);
-                            PhysicalExecutionFlow shuffleFlow = new 
PhysicalExecutionFlow(shuffleActionOfSinkFlow, 
Collections.singletonList(sinkFlow));
-                            setFlowConfig(shuffleFlow, parallelismIndex);
-
-                            long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
-                            TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(
-                                jobImmutableInformation.getJobId(), 
pipelineIndex, taskGroupID);
-                            TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
-                            SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(
-                                jobImmutableInformation.getJobId(), 
taskLocation, parallelismIndex, shuffleFlow);
-
-                            // checkpoint
-                            fillCheckpointPlan(seaTunnelTask);
-                            physicalVertices.add(new PhysicalVertex(
-                                parallelismIndex,
-                                executorService,
-                                shuffleFlow.getAction().getParallelism(),
-                                new TaskGroupDefaultImpl(taskGroupLocation, 
shuffleFlow.getAction().getName() +
-                                    "-ShuffleTask",
-                                    Collections.singletonList(seaTunnelTask)),
-                                flakeIdGenerator,
-                                pipelineIndex,
-                                totalPipelineNum,
-                                seaTunnelTask.getJarsUrl(),
-                                jobImmutableInformation,
-                                initializationTimestamp,
-                                nodeEngine,
-                                runningJobStateIMap,
-                                runningJobStateTimestampsIMap));
-                        }
+                        int parallelismIndex = 0;
+
+                        ShuffleStrategy shuffleStrategyOfSinkFlow = 
shuffleMultipleRowStrategy.toBuilder()
+                            .targetTableId(sinkTableId)
+                            .build();
+                        ShuffleConfig shuffleConfigOfSinkFlow = 
shuffleConfig.toBuilder()
+                            .shuffleStrategy(shuffleStrategyOfSinkFlow)
+                            .build();
+                        long shuffleActionId = idGenerator.getNextId();
+                        String shuffleActionName = String.format("Shuffle 
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
+                        ShuffleAction shuffleActionOfSinkFlow = new 
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
+                        shuffleActionOfSinkFlow.setParallelism(1);
+                        PhysicalExecutionFlow shuffleFlow = new 
PhysicalExecutionFlow(shuffleActionOfSinkFlow, 
Collections.singletonList(sinkFlow));
+                        setFlowConfig(shuffleFlow, parallelismIndex);
+
+                        long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
+                        TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(
+                            jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+                        TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
+                        SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(
+                            jobImmutableInformation.getJobId(), taskLocation, 
parallelismIndex, shuffleFlow);
+
+                        // checkpoint
+                        fillCheckpointPlan(seaTunnelTask);
+                        physicalVertices.add(new PhysicalVertex(
+                            parallelismIndex,
+                            executorService,
+                            shuffleFlow.getAction().getParallelism(),
+                            new TaskGroupDefaultImpl(taskGroupLocation, 
shuffleFlow.getAction().getName() +
+                                "-ShuffleTask",
+                                Collections.singletonList(seaTunnelTask)),
+                            flakeIdGenerator,
+                            pipelineIndex,
+                            totalPipelineNum,
+                            seaTunnelTask.getJarsUrl(),
+                            jobImmutableInformation,
+                            initializationTimestamp,
+                            nodeEngine,
+                            runningJobStateIMap,
+                            runningJobStateTimestampsIMap));
                     }
                 } else {
                     long taskIDPrefix = idGenerator.getNextId();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 0014eb9f8..f29fa39f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -34,7 +34,6 @@ import 
org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -215,14 +214,13 @@ public abstract class SeaTunnelTask extends AbstractTask {
                         new SeaTunnelTransformCollector(flowLifeCycles), 
completableFuture);
             } else if (f.getAction() instanceof ShuffleAction) {
                 ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
-                ShuffleConfig shuffleConfig = shuffleAction.getConfig();
                 HazelcastInstance hazelcastInstance = 
getExecutionContext().getInstance();
                 if (flow.getNext().isEmpty()) {
                     lifeCycle = new ShuffleSinkFlowLifeCycle(
-                        this, indexID, shuffleConfig, hazelcastInstance, 
completableFuture);
+                        this, indexID, shuffleAction, hazelcastInstance, 
completableFuture);
                 } else {
                     lifeCycle = new ShuffleSourceFlowLifeCycle(
-                        this, indexID, shuffleConfig, hazelcastInstance, 
completableFuture);
+                        this, indexID, shuffleAction, hazelcastInstance, 
completableFuture);
                 }
                 outputs = flowLifeCycles;
             } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 546ee782c..5bad7e239 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -28,6 +28,7 @@ import com.hazelcast.core.HazelcastInstance;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>> {
     private final int pipelineId;
     private final int taskIndex;
+    private final ShuffleAction shuffleAction;
     private final Map<String, IQueue<Record<?>>> shuffles;
     private final int shuffleBatchSize;
     private final long shuffleBatchFlushInterval;
@@ -51,16 +53,17 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
 
     public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
                                     int taskIndex,
-                                    ShuffleConfig shuffleConfig,
+                                    ShuffleAction shuffleAction,
                                     HazelcastInstance hazelcastInstance,
                                     CompletableFuture<Void> completableFuture) 
{
         super(runningTask, completableFuture);
         this.pipelineId = 
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
         this.taskIndex = taskIndex;
-        this.shuffleStrategy = shuffleConfig.getShuffleStrategy();
+        this.shuffleAction = shuffleAction;
+        this.shuffleStrategy = shuffleAction.getConfig().getShuffleStrategy();
         this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance, 
pipelineId, taskIndex);
-        this.shuffleBatchSize = shuffleConfig.getBatchSize();
-        this.shuffleBatchFlushInterval = shuffleConfig.getBatchFlushInterval();
+        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
+        this.shuffleBatchFlushInterval = 
shuffleAction.getConfig().getBatchFlushInterval();
         this.shuffleBuffer = new HashMap<>();
     }
 
@@ -71,10 +74,14 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
             shuffleFlush();
 
             Barrier barrier = (Barrier) record.getData();
-            runningTask.ack(barrier);
             if (barrier.prepareClose()) {
                 prepareClose = true;
             }
+            if (barrier.snapshot()) {
+                runningTask.addState(barrier, shuffleAction.getId(), 
Collections.emptyList());
+            }
+            runningTask.ack(barrier);
+
             // The barrier needs to be replicated to all channels
             for (Map.Entry<String, IQueue<Record<?>>> shuffle : 
shuffles.entrySet()) {
                 IQueue<Record<?>> shuffleQueue = shuffle.getValue();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 43c450f32..6720c9a7b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
@@ -27,6 +27,7 @@ import com.hazelcast.collection.IQueue;
 import com.hazelcast.core.HazelcastInstance;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
 
 @SuppressWarnings("MagicNumber")
 public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle 
implements OneOutputFlowLifeCycle<Record<?>> {
+    private final ShuffleAction shuffleAction;
     private final int shuffleBatchSize;
     private final IQueue<Record<?>>[] shuffles;
     private List<Record<?>> unsentBuffer;
@@ -44,13 +46,15 @@ public class ShuffleSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle impleme
 
     public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
                                       int taskIndex,
-                                      ShuffleConfig shuffleConfig,
+                                      ShuffleAction shuffleAction,
                                       HazelcastInstance hazelcastInstance,
                                       CompletableFuture<Void> 
completableFuture) {
         super(runningTask, completableFuture);
         int pipelineId = runningTask.getTaskLocation().getPipelineId();
-        this.shuffles = 
shuffleConfig.getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId, 
taskIndex);
-        this.shuffleBatchSize = shuffleConfig.getBatchSize();
+        this.shuffleAction = shuffleAction;
+        this.shuffles = shuffleAction.getConfig()
+            .getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId, 
taskIndex);
+        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
     }
 
     @Override
@@ -87,11 +91,16 @@ public class ShuffleSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle impleme
 
                     // publish barrier
                     if (alignedBarriersCounter == shuffles.length) {
-                        runningTask.ack(barrier);
                         if (barrier.prepareClose()) {
                             prepareClose = true;
                         }
+                        if (barrier.snapshot()) {
+                            runningTask.addState(barrier, 
shuffleAction.getId(), Collections.emptyList());
+                        }
+                        runningTask.ack(barrier);
+
                         collector.collect(record);
+
                         alignedBarriersCounter = 0;
                         alignedBarriers.clear();
                     }

Reply via email to