This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 507ca8561 [Hotfix][Zeta] Fix shuffle checkpoint (#4224)
507ca8561 is described below
commit 507ca85611b655be7f99b8530890a25705da7b31
Author: hailin0 <[email protected]>
AuthorDate: Sat Feb 25 17:02:38 2023 +0800
[Hotfix][Zeta] Fix shuffle checkpoint (#4224)
---
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 31 ++++++---
.../server/checkpoint/PendingCheckpoint.java | 2 +-
.../dag/execution/ExecutionPlanGenerator.java | 5 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 80 +++++++++++-----------
.../engine/server/task/SeaTunnelTask.java | 6 +-
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 19 +++--
.../task/flow/ShuffleSourceFlowLifeCycle.java | 19 +++--
7 files changed, 94 insertions(+), 68 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 2c0087213..98edb185a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -44,6 +45,7 @@ import java.util.Map;
/**
* Deserialization schema from Debezium object to {@link SeaTunnelRow}.
*/
+@Slf4j
public final class SeaTunnelRowDebeziumDeserializeSchema
implements DebeziumDeserializationSchema<SeaTunnelRow> {
private static final long serialVersionUID = 1L;
@@ -56,9 +58,9 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
/**
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link
SeaTunnelRow} consisted of
*/
- private final SeaTunnelRowDebeziumDeserializationConverters
singleRowConverter;
+ private final SeaTunnelRowDebeziumDeserializationConverters
singleTableRowConverter;
- private final Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleRowConverters;
+ private final Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleTableRowConverters;
/**
* Validator to validate the row value.
@@ -80,28 +82,28 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
ZoneId serverTimeZone,
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
- SeaTunnelRowDebeziumDeserializationConverters singleRowConverter =
null;
- Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleRowConverters = Collections.emptyMap();
+ SeaTunnelRowDebeziumDeserializationConverters singleTableRowConverter
= null;
+ Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleTableRowConverters = Collections.emptyMap();
if (physicalDataType instanceof MultipleRowType) {
- multipleRowConverters = new HashMap<>();
+ multipleTableRowConverters = new HashMap<>();
for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType)
physicalDataType) {
SeaTunnelRowDebeziumDeserializationConverters itemRowConverter
= new SeaTunnelRowDebeziumDeserializationConverters(
item.getValue(),
metadataConverters,
serverTimeZone,
userDefinedConverterFactory);
- multipleRowConverters.put(item.getKey(), itemRowConverter);
+ multipleTableRowConverters.put(item.getKey(),
itemRowConverter);
}
} else {
- singleRowConverter = new
SeaTunnelRowDebeziumDeserializationConverters(
+ singleTableRowConverter = new
SeaTunnelRowDebeziumDeserializationConverters(
(SeaTunnelRowType) physicalDataType,
metadataConverters,
serverTimeZone,
userDefinedConverterFactory
);
}
- this.singleRowConverter = singleRowConverter;
- this.multipleRowConverters = multipleRowConverters;
+ this.singleTableRowConverter = singleTableRowConverter;
+ this.multipleTableRowConverters = multipleTableRowConverters;
this.resultTypeInfo = checkNotNull(resultType);
this.validator = checkNotNull(validator);
}
@@ -114,7 +116,16 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
String tableName =
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
- SeaTunnelRowDebeziumDeserializationConverters converters =
multipleRowConverters.getOrDefault(tableName, singleRowConverter);
+ SeaTunnelRowDebeziumDeserializationConverters converters;
+ if (!multipleTableRowConverters.isEmpty()) {
+ converters = multipleTableRowConverters.get(tableName);
+ if (converters == null) {
+ log.debug("Ignore newly added table {}", tableName);
+ return;
+ }
+ } else {
+ converters = singleTableRowConverter;
+ }
if (operation == Envelope.Operation.CREATE || operation ==
Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record,
messageStruct, valueSchema);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 23ad0aa59..013b351b9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -124,7 +124,7 @@ public class PendingCheckpoint implements Checkpoint {
for (ActionSubtaskState state : states) {
ActionState actionState = actionStates.get(state.getActionId());
if (actionState == null) {
- return;
+ continue;
}
stateSize +=
state.getState().stream().filter(Objects::nonNull).map(s -> s.length).count();
actionState.reportState(state.getIndex(), state);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 8863a70d3..704b35ff1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -201,13 +201,14 @@ public class ExecutionPlanGenerator {
String shuffleActionName = String.format("Shuffle [%s -> table[0~%s]]",
sourceAction.getName(), ((MultipleRowType)
sourceProducedType).getTableIds().length - 1);
ShuffleAction shuffleAction = new ShuffleAction(shuffleVertexId,
shuffleActionName, shuffleConfig);
- // multiple-row shuffle default parallelism is 1
- shuffleAction.setParallelism(1);
+ shuffleAction.setParallelism(sourceAction.getParallelism());
ExecutionVertex shuffleVertex = new ExecutionVertex(shuffleVertexId,
shuffleAction, shuffleAction.getParallelism());
ExecutionEdge sourceToShuffleEdge = new
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
newExecutionEdges.add(sourceToShuffleEdge);
for (ExecutionVertex sinkVertex : sinkVertices) {
+ sinkVertex.setParallelism(1);
+ sinkVertex.getAction().setParallelism(1);
ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex,
sinkVertex);
newExecutionEdges.add(shuffleToSinkEdge);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index c55a14e7e..cdc2cac22 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -297,46 +297,46 @@ public class PhysicalPlanGenerator {
long taskIDPrefix = idGenerator.getNextId();
long taskGroupIDPrefix = idGenerator.getNextId();
- for (int parallelismIndex = 0; parallelismIndex <
flow.getAction().getParallelism(); parallelismIndex++) {
- ShuffleStrategy shuffleStrategyOfSinkFlow =
shuffleMultipleRowStrategy.toBuilder()
- .targetTableId(sinkTableId)
- .build();
- ShuffleConfig shuffleConfigOfSinkFlow =
shuffleConfig.toBuilder()
- .shuffleStrategy(shuffleStrategyOfSinkFlow)
- .build();
- long shuffleActionId = idGenerator.getNextId();
- String shuffleActionName = String.format("Shuffle
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
- ShuffleAction shuffleActionOfSinkFlow = new
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
- shuffleActionOfSinkFlow.setParallelism(1);
- PhysicalExecutionFlow shuffleFlow = new
PhysicalExecutionFlow(shuffleActionOfSinkFlow,
Collections.singletonList(sinkFlow));
- setFlowConfig(shuffleFlow, parallelismIndex);
-
- long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
- TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(
- jobImmutableInformation.getJobId(),
pipelineIndex, taskGroupID);
- TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
- SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(
- jobImmutableInformation.getJobId(),
taskLocation, parallelismIndex, shuffleFlow);
-
- // checkpoint
- fillCheckpointPlan(seaTunnelTask);
- physicalVertices.add(new PhysicalVertex(
- parallelismIndex,
- executorService,
- shuffleFlow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupLocation,
shuffleFlow.getAction().getName() +
- "-ShuffleTask",
- Collections.singletonList(seaTunnelTask)),
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- seaTunnelTask.getJarsUrl(),
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine,
- runningJobStateIMap,
- runningJobStateTimestampsIMap));
- }
+ int parallelismIndex = 0;
+
+ ShuffleStrategy shuffleStrategyOfSinkFlow =
shuffleMultipleRowStrategy.toBuilder()
+ .targetTableId(sinkTableId)
+ .build();
+ ShuffleConfig shuffleConfigOfSinkFlow =
shuffleConfig.toBuilder()
+ .shuffleStrategy(shuffleStrategyOfSinkFlow)
+ .build();
+ long shuffleActionId = idGenerator.getNextId();
+ String shuffleActionName = String.format("Shuffle
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
+ ShuffleAction shuffleActionOfSinkFlow = new
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
+ shuffleActionOfSinkFlow.setParallelism(1);
+ PhysicalExecutionFlow shuffleFlow = new
PhysicalExecutionFlow(shuffleActionOfSinkFlow,
Collections.singletonList(sinkFlow));
+ setFlowConfig(shuffleFlow, parallelismIndex);
+
+ long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
+ TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(
+ jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
+ SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(
+ jobImmutableInformation.getJobId(), taskLocation,
parallelismIndex, shuffleFlow);
+
+ // checkpoint
+ fillCheckpointPlan(seaTunnelTask);
+ physicalVertices.add(new PhysicalVertex(
+ parallelismIndex,
+ executorService,
+ shuffleFlow.getAction().getParallelism(),
+ new TaskGroupDefaultImpl(taskGroupLocation,
shuffleFlow.getAction().getName() +
+ "-ShuffleTask",
+ Collections.singletonList(seaTunnelTask)),
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ seaTunnelTask.getJarsUrl(),
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
}
} else {
long taskIDPrefix = idGenerator.getNextId();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 0014eb9f8..f29fa39f5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -34,7 +34,6 @@ import
org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -215,14 +214,13 @@ public abstract class SeaTunnelTask extends AbstractTask {
new SeaTunnelTransformCollector(flowLifeCycles),
completableFuture);
} else if (f.getAction() instanceof ShuffleAction) {
ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
- ShuffleConfig shuffleConfig = shuffleAction.getConfig();
HazelcastInstance hazelcastInstance =
getExecutionContext().getInstance();
if (flow.getNext().isEmpty()) {
lifeCycle = new ShuffleSinkFlowLifeCycle(
- this, indexID, shuffleConfig, hazelcastInstance,
completableFuture);
+ this, indexID, shuffleAction, hazelcastInstance,
completableFuture);
} else {
lifeCycle = new ShuffleSourceFlowLifeCycle(
- this, indexID, shuffleConfig, hazelcastInstance,
completableFuture);
+ this, indexID, shuffleAction, hazelcastInstance,
completableFuture);
}
outputs = flowLifeCycles;
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 546ee782c..5bad7e239 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -28,6 +28,7 @@ import com.hazelcast.core.HazelcastInstance;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>> {
private final int pipelineId;
private final int taskIndex;
+ private final ShuffleAction shuffleAction;
private final Map<String, IQueue<Record<?>>> shuffles;
private final int shuffleBatchSize;
private final long shuffleBatchFlushInterval;
@@ -51,16 +53,17 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
int taskIndex,
- ShuffleConfig shuffleConfig,
+ ShuffleAction shuffleAction,
HazelcastInstance hazelcastInstance,
CompletableFuture<Void> completableFuture)
{
super(runningTask, completableFuture);
this.pipelineId =
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
this.taskIndex = taskIndex;
- this.shuffleStrategy = shuffleConfig.getShuffleStrategy();
+ this.shuffleAction = shuffleAction;
+ this.shuffleStrategy = shuffleAction.getConfig().getShuffleStrategy();
this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance,
pipelineId, taskIndex);
- this.shuffleBatchSize = shuffleConfig.getBatchSize();
- this.shuffleBatchFlushInterval = shuffleConfig.getBatchFlushInterval();
+ this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
+ this.shuffleBatchFlushInterval =
shuffleAction.getConfig().getBatchFlushInterval();
this.shuffleBuffer = new HashMap<>();
}
@@ -71,10 +74,14 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
shuffleFlush();
Barrier barrier = (Barrier) record.getData();
- runningTask.ack(barrier);
if (barrier.prepareClose()) {
prepareClose = true;
}
+ if (barrier.snapshot()) {
+ runningTask.addState(barrier, shuffleAction.getId(),
Collections.emptyList());
+ }
+ runningTask.ack(barrier);
+
// The barrier needs to be replicated to all channels
for (Map.Entry<String, IQueue<Record<?>>> shuffle :
shuffles.entrySet()) {
IQueue<Record<?>> shuffleQueue = shuffle.getValue();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 43c450f32..6720c9a7b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -27,6 +27,7 @@ import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
@SuppressWarnings("MagicNumber")
public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
implements OneOutputFlowLifeCycle<Record<?>> {
+ private final ShuffleAction shuffleAction;
private final int shuffleBatchSize;
private final IQueue<Record<?>>[] shuffles;
private List<Record<?>> unsentBuffer;
@@ -44,13 +46,15 @@ public class ShuffleSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle impleme
public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
int taskIndex,
- ShuffleConfig shuffleConfig,
+ ShuffleAction shuffleAction,
HazelcastInstance hazelcastInstance,
CompletableFuture<Void>
completableFuture) {
super(runningTask, completableFuture);
int pipelineId = runningTask.getTaskLocation().getPipelineId();
- this.shuffles =
shuffleConfig.getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId,
taskIndex);
- this.shuffleBatchSize = shuffleConfig.getBatchSize();
+ this.shuffleAction = shuffleAction;
+ this.shuffles = shuffleAction.getConfig()
+ .getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId,
taskIndex);
+ this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
}
@Override
@@ -87,11 +91,16 @@ public class ShuffleSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle impleme
// publish barrier
if (alignedBarriersCounter == shuffles.length) {
- runningTask.ack(barrier);
if (barrier.prepareClose()) {
prepareClose = true;
}
+ if (barrier.snapshot()) {
+ runningTask.addState(barrier,
shuffleAction.getId(), Collections.emptyList());
+ }
+ runningTask.ack(barrier);
+
collector.collect(record);
+
alignedBarriersCounter = 0;
alignedBarriers.clear();
}