This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c9b9fff26 [Improve] [Engine] Move Completed Checkpoint trigger to
CheckpointCoordinator (#3514)
c9b9fff26 is described below
commit c9b9fff26dcb1f1e507a1b6991a436e001fc9362
Author: Hisoka <[email protected]>
AuthorDate: Mon Nov 28 20:33:57 2022 +0800
[Improve] [Engine] Move Completed Checkpoint trigger to
CheckpointCoordinator (#3514)
* [Engine] [CheckPoint] Move Completed Checkpoint trigger to
CheckpointCoordinator
* [Checkpoint] Add Checkpoint for multi source
---
.../server/checkpoint/CheckpointCoordinator.java | 107 ++++++++++++++-------
.../server/checkpoint/CheckpointManager.java | 9 ++
.../serializable/TaskDataSerializerHook.java | 5 +
.../engine/server/task/SeaTunnelTask.java | 3 +
.../server/task/SinkAggregatedCommitterTask.java | 2 +
.../server/task/SourceSplitEnumeratorTask.java | 6 +-
.../source/LastCheckpointNotifyOperation.java | 72 ++++++++++++++
.../engine/server/task/record/Barrier.java | 8 --
8 files changed, 168 insertions(+), 44 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index fc2ec8a15..f7855bc59 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
+import static
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.CHECKPOINT_TYPE;
import static
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
import static
org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
@@ -29,6 +30,7 @@ import
org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerialize
import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -101,6 +103,7 @@ public class CheckpointCoordinator {
private final CheckpointPlan plan;
+ private final Set<TaskLocation> readyToCloseStartingTask;
private final ConcurrentHashMap<Long, PendingCheckpoint>
pendingCheckpoints;
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
@@ -153,6 +156,7 @@ public class CheckpointCoordinator {
this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
this.pipelineTaskStatus = new ConcurrentHashMap<>();
this.checkpointIdCounter = checkpointIdCounter;
+ this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
if (pipelineState != null) {
this.latestCompletedCheckpoint =
serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
}
@@ -228,18 +232,50 @@ public class CheckpointCoordinator {
}
private void scheduleTriggerPendingCheckpoint(long delayMills) {
- scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills,
TimeUnit.MILLISECONDS);
+ scheduler.schedule(() -> tryTriggerPendingCheckpoint(), delayMills,
TimeUnit.MILLISECONDS);
}
protected void tryTriggerPendingCheckpoint() {
+ tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
+ }
+
+ protected void readyToClose(TaskLocation taskLocation) {
+ readyToCloseStartingTask.add(taskLocation);
+ if (readyToCloseStartingTask.size() ==
plan.getStartingSubtasks().size()) {
+ tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
+ }
+ }
+
+ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
synchronized (lock) {
+ if (isCompleted() || isShutdown()) {
+ LOG.warn(String.format("can't trigger checkpoint with type:
%s, because checkpoint coordinator already have completed checkpoint",
checkpointType));
+ return;
+ }
final long currentTimestamp = Instant.now().toEpochMilli();
- if (currentTimestamp - latestTriggerTimestamp.get() >=
coordinatorConfig.getCheckpointInterval() &&
- pendingCounter.get() <
coordinatorConfig.getMaxConcurrentCheckpoints() && isAllTaskReady) {
- CompletableFuture<PendingCheckpoint> pendingCheckpoint =
createPendingCheckpoint(currentTimestamp, CheckpointType.CHECKPOINT_TYPE);
- startTriggerPendingCheckpoint(pendingCheckpoint);
- pendingCounter.incrementAndGet();
-
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+ if (checkpointType.equals(CHECKPOINT_TYPE)) {
+ if (currentTimestamp - latestTriggerTimestamp.get() <
coordinatorConfig.getCheckpointInterval() ||
+ pendingCounter.get() >=
coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) {
+ return;
+ }
+ } else {
+ shutdown = true;
+ waitingPendingCheckpointDone();
+ }
+ CompletableFuture<PendingCheckpoint> pendingCheckpoint =
createPendingCheckpoint(currentTimestamp, checkpointType);
+ startTriggerPendingCheckpoint(pendingCheckpoint);
+ pendingCounter.incrementAndGet();
+
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void waitingPendingCheckpointDone() {
+ while (pendingCounter.get() != 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new SeaTunnelEngineException(e);
}
}
}
@@ -271,23 +307,21 @@ public class CheckpointCoordinator {
PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
pendingCheckpoint.getCompletableFuture();
completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
- if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType())
{
- // Trigger the barrier and wait for all tasks to ACK
- LOG.debug("trigger checkpoint barrier {}/{}/{}, {}",
pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(),
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
- CompletableFuture<InvocationFuture<?>[]>
completableFutureArray = CompletableFuture.supplyAsync(() ->
- new
CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
- pendingCheckpoint.getCheckpointTimestamp(),
- pendingCheckpoint.getCheckpointType()))
- .thenApplyAsync(this::triggerCheckpoint);
-
- try {
- CompletableFuture.allOf(completableFutureArray).get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (Exception e) {
- LOG.error(ExceptionUtils.getMessage(e));
- return;
- }
+ // Trigger the barrier and wait for all tasks to ACK
+ LOG.debug("trigger checkpoint barrier {}/{}/{}, {}",
pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(),
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
+ CompletableFuture<InvocationFuture<?>[]> completableFutureArray =
CompletableFuture.supplyAsync(() ->
+ new CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
+ pendingCheckpoint.getCheckpointTimestamp(),
+ pendingCheckpoint.getCheckpointType()))
+ .thenApplyAsync(this::triggerCheckpoint);
+
+ try {
+ CompletableFuture.allOf(completableFutureArray).get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ LOG.error(ExceptionUtils.getMessage(e));
+ return;
}
LOG.info("Start a scheduled task to prevent checkpoint timeouts");
@@ -307,16 +341,21 @@ public class CheckpointCoordinator {
CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long
triggerTimestamp, CheckpointType checkpointType) {
synchronized (lock) {
- CompletableFuture<Long> idFuture =
CompletableFuture.supplyAsync(() -> {
- try {
- // this must happen outside the coordinator-wide lock,
- // because it communicates with external services
- // (in HA mode) and may block for a while.
- return checkpointIdCounter.getAndIncrement();
- } catch (Throwable e) {
- throw new CompletionException(e);
- }
- });
+ CompletableFuture<Long> idFuture;
+ if (!checkpointType.equals(COMPLETED_POINT_TYPE)) {
+ idFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ // this must happen outside the coordinator-wide lock,
+ // because it communicates with external services
+ // (in HA mode) and may block for a while.
+ return checkpointIdCounter.getAndIncrement();
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ });
+ } else {
+ idFuture = CompletableFuture.supplyAsync(() ->
Barrier.PREPARE_CLOSE_BARRIER_ID);
+ }
return triggerPendingCheckpoint(triggerTimestamp, idFuture,
checkpointType);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 78829e4dc..688fe0a42 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -167,6 +168,14 @@ public class CheckpointManager {
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}
+ /**
+ * Called by the {@link SourceSplitEnumeratorTask}.
+ * <br> used by SourceSplitEnumeratorTask to tell CheckpointCoordinator
pipeline will trigger close barrier by SourceSplitEnumeratorTask.
+ */
+ public void readyToClose(TaskLocation taskLocation) {
+ getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
+ }
+
/**
* Called by the JobMaster.
* <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which
is used to cancel the running {@link PendingCheckpoint} when the SubPlan is
abnormal.
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 2161c6cbe..5ea42d78e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequest
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
@@ -72,6 +73,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int BARRIER_FLOW_OPERATOR = 16;
+ public static final int LAST_CHECKPOINT_NOTIFY = 17;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -124,6 +127,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new NotifyTaskStatusOperation();
case BARRIER_FLOW_OPERATOR:
return new BarrierFlowOperation();
+ case LAST_CHECKPOINT_NOTIFY:
+ return new LastCheckpointNotifyOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index f614b84f7..69123b334 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -122,6 +122,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
CompletableFuture.allOf(flowFutures.toArray(new
CompletableFuture[0])).whenComplete((s, e) -> closeCalled = true);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
protected void stateProcess() throws Exception {
switch (currState) {
case INIT:
@@ -154,6 +155,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
case PREPARE_CLOSE:
if (closeCalled) {
currState = CLOSED;
+ } else {
+ Thread.sleep(100);
}
break;
case CLOSED:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 6fa251b99..dc42be5fd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -150,6 +150,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
case PREPARE_CLOSE:
if (closeCalled) {
currState = CLOSED;
+ } else {
+ Thread.sleep(100);
}
break;
case CLOSED:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2b2d2befb..d616cc273 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
import
org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
@@ -234,8 +235,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
case RUNNING:
// The reader closes automatically after reading
if (prepareCloseStatus) {
- // TODO we should trigger this after CheckpointCoordinator
done
- triggerBarrier(Barrier.completedBarrier());
+ this.getExecutionContext().sendToMaster(new
LastCheckpointNotifyOperation(jobID, taskLocation));
currState = PREPARE_CLOSE;
} else {
Thread.sleep(100);
@@ -244,6 +244,8 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
case PREPARE_CLOSE:
if (closeCalled) {
currState = CLOSED;
+ } else {
+ Thread.sleep(100);
}
break;
case CLOSED:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
new file mode 100644
index 000000000..e0608b50b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class LastCheckpointNotifyOperation extends Operation implements
IdentifiedDataSerializable {
+
+ private long jobId;
+ private TaskLocation taskLocation;
+
+ public LastCheckpointNotifyOperation() {
+
+ }
+
+ public LastCheckpointNotifyOperation(long jobId, TaskLocation
taskLocation) {
+ this.jobId = jobId;
+ this.taskLocation = taskLocation;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+
server.getCoordinatorService().getJobMaster(jobId).getCheckpointManager().readyToClose(taskLocation);
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ out.writeLong(jobId);
+ out.writeObject(taskLocation);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ jobId = in.readLong();
+ taskLocation = in.readObject();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.LAST_CHECKPOINT_NOTIFY;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 5985fd2f4..2ace7c635 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,11 +17,6 @@
package org.apache.seatunnel.engine.server.task.record;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-
-import java.time.Instant;
-
/**
* barrier flowing in data flow
*/
@@ -44,7 +39,4 @@ public interface Barrier {
*/
boolean prepareClose();
- static CheckpointBarrier completedBarrier() {
- return new CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID,
Instant.now().toEpochMilli(), CheckpointType.COMPLETED_POINT_TYPE);
- }
}