This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c9b9fff26 [Improve] [Engine] Move Completed Checkpoint trigger to 
CheckpointCoordinator (#3514)
c9b9fff26 is described below

commit c9b9fff26dcb1f1e507a1b6991a436e001fc9362
Author: Hisoka <[email protected]>
AuthorDate: Mon Nov 28 20:33:57 2022 +0800

    [Improve] [Engine] Move Completed Checkpoint trigger to 
CheckpointCoordinator (#3514)
    
    * [Engine] [CheckPoint] Move Completed Checkpoint trigger to 
CheckpointCoordinator
    
    * [Checkpoint] Add Checkpoint for multi source
---
 .../server/checkpoint/CheckpointCoordinator.java   | 107 ++++++++++++++-------
 .../server/checkpoint/CheckpointManager.java       |   9 ++
 .../serializable/TaskDataSerializerHook.java       |   5 +
 .../engine/server/task/SeaTunnelTask.java          |   3 +
 .../server/task/SinkAggregatedCommitterTask.java   |   2 +
 .../server/task/SourceSplitEnumeratorTask.java     |   6 +-
 .../source/LastCheckpointNotifyOperation.java      |  72 ++++++++++++++
 .../engine/server/task/record/Barrier.java         |   8 --
 8 files changed, 168 insertions(+), 44 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index fc2ec8a15..f7855bc59 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import static 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
+import static 
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.CHECKPOINT_TYPE;
 import static 
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
 import static 
org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
@@ -29,6 +30,7 @@ import 
org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerialize
 import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -101,6 +103,7 @@ public class CheckpointCoordinator {
 
     private final CheckpointPlan plan;
 
+    private final Set<TaskLocation> readyToCloseStartingTask;
     private final ConcurrentHashMap<Long, PendingCheckpoint> 
pendingCheckpoints;
 
     private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
@@ -153,6 +156,7 @@ public class CheckpointCoordinator {
         this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
         this.pipelineTaskStatus = new ConcurrentHashMap<>();
         this.checkpointIdCounter = checkpointIdCounter;
+        this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
         if (pipelineState != null) {
             this.latestCompletedCheckpoint = 
serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
         }
@@ -228,18 +232,50 @@ public class CheckpointCoordinator {
     }
 
     private void scheduleTriggerPendingCheckpoint(long delayMills) {
-        scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, 
TimeUnit.MILLISECONDS);
+        scheduler.schedule(() -> tryTriggerPendingCheckpoint(), delayMills, 
TimeUnit.MILLISECONDS);
     }
 
     protected void tryTriggerPendingCheckpoint() {
+        tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
+    }
+
+    protected void readyToClose(TaskLocation taskLocation) {
+        readyToCloseStartingTask.add(taskLocation);
+        if (readyToCloseStartingTask.size() == 
plan.getStartingSubtasks().size()) {
+            tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
+        }
+    }
+
+    protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
         synchronized (lock) {
+            if (isCompleted() || isShutdown()) {
+                LOG.warn(String.format("can't trigger checkpoint with type: 
%s, because checkpoint coordinator already have completed checkpoint", 
checkpointType));
+                return;
+            }
             final long currentTimestamp = Instant.now().toEpochMilli();
-            if (currentTimestamp - latestTriggerTimestamp.get() >= 
coordinatorConfig.getCheckpointInterval() &&
-                pendingCounter.get() < 
coordinatorConfig.getMaxConcurrentCheckpoints() && isAllTaskReady) {
-                CompletableFuture<PendingCheckpoint> pendingCheckpoint = 
createPendingCheckpoint(currentTimestamp, CheckpointType.CHECKPOINT_TYPE);
-                startTriggerPendingCheckpoint(pendingCheckpoint);
-                pendingCounter.incrementAndGet();
-                
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+            if (checkpointType.equals(CHECKPOINT_TYPE)) {
+                if (currentTimestamp - latestTriggerTimestamp.get() < 
coordinatorConfig.getCheckpointInterval() ||
+                    pendingCounter.get() >= 
coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) {
+                    return;
+                }
+            } else {
+                shutdown = true;
+                waitingPendingCheckpointDone();
+            }
+            CompletableFuture<PendingCheckpoint> pendingCheckpoint = 
createPendingCheckpoint(currentTimestamp, checkpointType);
+            startTriggerPendingCheckpoint(pendingCheckpoint);
+            pendingCounter.incrementAndGet();
+            
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void waitingPendingCheckpointDone() {
+        while (pendingCounter.get() != 0) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw new SeaTunnelEngineException(e);
             }
         }
     }
@@ -271,23 +307,21 @@ public class CheckpointCoordinator {
             PassiveCompletableFuture<CompletedCheckpoint> completableFuture = 
pendingCheckpoint.getCompletableFuture();
             completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
 
-            if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType()) 
{
-                // Trigger the barrier and wait for all tasks to ACK
-                LOG.debug("trigger checkpoint barrier {}/{}/{}, {}", 
pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(), 
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
-                CompletableFuture<InvocationFuture<?>[]> 
completableFutureArray = CompletableFuture.supplyAsync(() ->
-                        new 
CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
-                            pendingCheckpoint.getCheckpointTimestamp(),
-                            pendingCheckpoint.getCheckpointType()))
-                    .thenApplyAsync(this::triggerCheckpoint);
-
-                try {
-                    CompletableFuture.allOf(completableFutureArray).get();
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                } catch (Exception e) {
-                    LOG.error(ExceptionUtils.getMessage(e));
-                    return;
-                }
+            // Trigger the barrier and wait for all tasks to ACK
+            LOG.debug("trigger checkpoint barrier {}/{}/{}, {}", 
pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(), 
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
+            CompletableFuture<InvocationFuture<?>[]> completableFutureArray = 
CompletableFuture.supplyAsync(() ->
+                    new CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
+                        pendingCheckpoint.getCheckpointTimestamp(),
+                        pendingCheckpoint.getCheckpointType()))
+                .thenApplyAsync(this::triggerCheckpoint);
+
+            try {
+                CompletableFuture.allOf(completableFutureArray).get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (Exception e) {
+                LOG.error(ExceptionUtils.getMessage(e));
+                return;
             }
 
             LOG.info("Start a scheduled task to prevent checkpoint timeouts");
@@ -307,16 +341,21 @@ public class CheckpointCoordinator {
 
     CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long 
triggerTimestamp, CheckpointType checkpointType) {
         synchronized (lock) {
-            CompletableFuture<Long> idFuture = 
CompletableFuture.supplyAsync(() -> {
-                try {
-                    // this must happen outside the coordinator-wide lock,
-                    // because it communicates with external services
-                    // (in HA mode) and may block for a while.
-                    return checkpointIdCounter.getAndIncrement();
-                } catch (Throwable e) {
-                    throw new CompletionException(e);
-                }
-            });
+            CompletableFuture<Long> idFuture;
+            if (!checkpointType.equals(COMPLETED_POINT_TYPE)) {
+                idFuture = CompletableFuture.supplyAsync(() -> {
+                    try {
+                        // this must happen outside the coordinator-wide lock,
+                        // because it communicates with external services
+                        // (in HA mode) and may block for a while.
+                        return checkpointIdCounter.getAndIncrement();
+                    } catch (Throwable e) {
+                        throw new CompletionException(e);
+                    }
+                });
+            } else {
+                idFuture = CompletableFuture.supplyAsync(() -> 
Barrier.PREPARE_CLOSE_BARRIER_ID);
+            }
             return triggerPendingCheckpoint(triggerTimestamp, idFuture, 
checkpointType);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 78829e4dc..688fe0a42 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -167,6 +168,14 @@ public class CheckpointManager {
         
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
     }
 
+    /**
+     * Called by the {@link SourceSplitEnumeratorTask}.
+     * <br> used by SourceSplitEnumeratorTask to tell CheckpointCoordinator 
pipeline will trigger close barrier by SourceSplitEnumeratorTask.
+     */
+    public void readyToClose(TaskLocation taskLocation) {
+        getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
+    }
+
     /**
      * Called by the JobMaster.
      * <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which 
is used to cancel the running {@link PendingCheckpoint} when the SubPlan is 
abnormal.
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 2161c6cbe..5ea42d78e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequest
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
@@ -72,6 +73,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int BARRIER_FLOW_OPERATOR = 16;
 
+    public static final int LAST_CHECKPOINT_NOTIFY = 17;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -124,6 +127,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new NotifyTaskStatusOperation();
                 case BARRIER_FLOW_OPERATOR:
                     return new BarrierFlowOperation();
+                case LAST_CHECKPOINT_NOTIFY:
+                    return new LastCheckpointNotifyOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index f614b84f7..69123b334 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -122,6 +122,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
         CompletableFuture.allOf(flowFutures.toArray(new 
CompletableFuture[0])).whenComplete((s, e) -> closeCalled = true);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     protected void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
@@ -154,6 +155,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
             case PREPARE_CLOSE:
                 if (closeCalled) {
                     currState = CLOSED;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case CLOSED:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 6fa251b99..dc42be5fd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -150,6 +150,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
             case PREPARE_CLOSE:
                 if (closeCalled) {
                     currState = CLOSED;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case CLOSED:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2b2d2befb..d616cc273 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import 
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
 import 
org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
@@ -234,8 +235,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             case RUNNING:
                 // The reader closes automatically after reading
                 if (prepareCloseStatus) {
-                    // TODO we should trigger this after CheckpointCoordinator 
done
-                    triggerBarrier(Barrier.completedBarrier());
+                    this.getExecutionContext().sendToMaster(new 
LastCheckpointNotifyOperation(jobID, taskLocation));
                     currState = PREPARE_CLOSE;
                 } else {
                     Thread.sleep(100);
@@ -244,6 +244,8 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             case PREPARE_CLOSE:
                 if (closeCalled) {
                     currState = CLOSED;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case CLOSED:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
new file mode 100644
index 000000000..e0608b50b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/LastCheckpointNotifyOperation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class LastCheckpointNotifyOperation extends Operation implements 
IdentifiedDataSerializable {
+
+    private long jobId;
+    private TaskLocation taskLocation;
+
+    public LastCheckpointNotifyOperation() {
+
+    }
+
+    public LastCheckpointNotifyOperation(long jobId, TaskLocation 
taskLocation) {
+        this.jobId = jobId;
+        this.taskLocation = taskLocation;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        
server.getCoordinatorService().getJobMaster(jobId).getCheckpointManager().readyToClose(taskLocation);
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        out.writeLong(jobId);
+        out.writeObject(taskLocation);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        jobId = in.readLong();
+        taskLocation = in.readObject();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.LAST_CHECKPOINT_NOTIFY;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 5985fd2f4..2ace7c635 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,11 +17,6 @@
 
 package org.apache.seatunnel.engine.server.task.record;
 
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-
-import java.time.Instant;
-
 /**
  * barrier flowing in data flow
  */
@@ -44,7 +39,4 @@ public interface Barrier {
      */
     boolean prepareClose();
 
-    static CheckpointBarrier completedBarrier() {
-        return new CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID, 
Instant.now().toEpochMilli(), CheckpointType.COMPLETED_POINT_TYPE);
-    }
 }

Reply via email to