1996fanrui commented on code in PR #28554:
URL: https://github.com/apache/flink/pull/28554#discussion_r3486029637


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveryCheckpointTrigger.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.channel;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+@Internal
+public interface RecoveryCheckpointTrigger {
+
+    /**
+     * Atomically snapshots the undrained spill slice and inserts matching 
{@link
+     * RecoveryCheckpointBarrier}s into in-recovery channels. Returns an 
independent reader over the
+     * remaining segments; the caller owns and must close it.
+     */
+    FetchedChannelStateReader snapshotAndInsertBarriers(long checkpointId) 
throws IOException;
+
+    /** Returns an empty reader (no spill files, so no segments) and inserts 
no barriers. */
+    RecoveryCheckpointTrigger NO_OP = checkpointId -> 
FetchedChannelStateReader.emptyReader();
+
+    RecoveryCheckpointTrigger NOT_READY =
+            ign -> {
+                throw new IllegalStateException("RecoveryCheckpointTrigger is 
not ready yet");
+            };
+    RecoveryCheckpointTrigger FAILING =

Review Comment:
   Not usage



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java:
##########
@@ -190,14 +198,16 @@ public String toString() {
 
     public abstract void requestPartitions() throws IOException;
 
-    public abstract CompletableFuture<Void> getStateConsumedFuture();
-
     /**
-     * Returns a future that completes when buffer filtering is complete for 
all channels. This
-     * future completes before {@link #getStateConsumedFuture()}, enabling 
earlier RUNNING state
-     * transition when unaligned checkpoint during recovery is enabled.
+     * Requests the partitions. {@code needsRecovery} controls whether 
converted physical channels
+     * start in recovery (i.e. with no credit / no floating buffers until the 
recovered channel
+     * state has been drained). The default implementation ignores the flag.
      */
-    public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture();
+    public void requestPartitions(boolean needsRecovery) throws IOException {
+        requestPartitions();
+    }

Review Comment:
   Instead of keeping 2 methods, could `void requestPartitions() ` be replaced 
by`requestPartitions(boolean needsRecovery)`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl.java:
##########
@@ -69,29 +75,38 @@ public void readInputData(InputGate[] inputGates, 
RecordFilterContext filterCont
                         ? 
ChannelStateFilteringHandler.createFromContext(filterContext, inputGates)
                         : null;
 
-        try (ChannelStateFilteringHandler ignored = filteringHandler;
-                InputChannelRecoveredStateHandler stateHandler =
-                        new InputChannelRecoveredStateHandler(
-                                inputGates,
-                                
taskStateSnapshot.getInputRescalingDescriptor(),
-                                filteringHandler,
-                                filterContext.getMemorySegmentSize())) {
-            read(
-                    stateHandler,
-                    groupByDelegate(
-                            streamSubtaskStates(),
-                            ChannelStateHelper::extractUnmergedInputHandles));
-            read(
-                    stateHandler,
-                    groupByDelegate(
-                            streamSubtaskStates(),
-                            
OperatorSubtaskState::getUpstreamOutputBufferState));
+        // Manual close ordering so the produced spill file can be published 
after
+        // stateHandler.close() flushes the filter writer.
+        AbstractInputChannelRecoveredStateHandler stateHandler =
+                AbstractInputChannelRecoveredStateHandler.create(
+                        inputGates,
+                        taskStateSnapshot.getInputRescalingDescriptor(),
+                        filterContext.isCheckpointingDuringRecoveryEnabled(),
+                        filteringHandler,
+                        filterContext.getMemorySegmentSize(),
+                        filterContext.getTmpDirectories());
+        try (ChannelStateFilteringHandler ignored = filteringHandler) {
+            try (stateHandler) {

Review Comment:
   One try seems enough here
   
   ```suggestion
           try (ChannelStateFilteringHandler ignored = filteringHandler;
                AbstractInputChannelRecoveredStateHandler stateHandler =
                        AbstractInputChannelRecoveredStateHandler.create(
                                inputGates,
                                taskStateSnapshot.getInputRescalingDescriptor(),
                                
filterContext.isCheckpointingDuringRecoveryEnabled(),
                                filteringHandler,
                                filterContext.getMemorySegmentSize(),
                                filterContext.getTmpDirectories())) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -332,32 +334,12 @@ public CompletableFuture<Void> getStateConsumedFuture() {
     }
 
     @Override
-    public void setCheckpointingDuringRecoveryEnabled(boolean enabled) {
-        this.checkpointingDuringRecoveryEnabled = enabled;
-    }
-
-    @Override
-    public boolean isCheckpointingDuringRecoveryEnabled() {
-        return checkpointingDuringRecoveryEnabled;
-    }
-
-    @Override
-    public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
-        synchronized (requestLock) {
-            List<CompletableFuture<?>> futures = new 
ArrayList<>(numberOfInputChannels);
-            for (InputChannel inputChannel : inputChannels()) {
-                if (inputChannel instanceof RecoveredInputChannel) {
-                    futures.add(
-                            ((RecoveredInputChannel) inputChannel)
-                                    .getBufferFilteringCompleteFuture());
-                }
-            }
-            return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
-        }
+    public void requestPartitions() {
+        requestPartitions(false);
     }
 
     @Override
-    public void requestPartitions() {
+    public void requestPartitions(boolean needsRecovery) {

Review Comment:
   Same comment with 
https://github.com/apache/flink/pull/28554/changes#r3486064582
   
   



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RecordFilterContext.java:
##########
@@ -130,8 +131,17 @@ public RecordFilterContext(
         this.rescalingDescriptor = checkNotNull(rescalingDescriptor);
         this.subtaskIndex = subtaskIndex;
         this.maxParallelism = maxParallelism;
-        this.tmpDirectories = tmpDirectories != null ? tmpDirectories : new 
String[0];
         this.checkpointingDuringRecoveryEnabled = 
checkpointingDuringRecoveryEnabled;
+        if (checkpointingDuringRecoveryEnabled) {
+            // tmpDirectories are only used by the spilling 
(checkpointing-during-recovery) path.
+            checkArgument(
+                    checkNotNull(tmpDirectories).length > 0, "tmpDirectories 
must not be empty");
+            this.tmpDirectories = tmpDirectories.clone();
+        } else {
+            // A disabled context never spills, so it needs no spill 
directories; tolerate a
+            // null/empty value (e.g. an environment without IOManager 
spilling directories).
+            this.tmpDirectories = tmpDirectories == null ? new String[0] : 
tmpDirectories.clone();
+        }

Review Comment:
   In my previous commits, check rule is `checkNotNull(tmpDirectories).length > 
0`, why do we tolerate `tmpDirectories is null`?
   
   Is it only for testing?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java:
##########
@@ -113,48 +157,220 @@ public LocalInputChannel(
 
         this.partitionManager = checkNotNull(partitionManager);
         this.taskEventPublisher = checkNotNull(taskEventPublisher);
-        this.channelStatePersister = new ChannelStatePersister(stateWriter, 
getChannelInfo());
-
-        // Migrate recovered buffers from RecoveredInputChannel if provided.
-        // These buffers have been filtered but not yet consumed by the Task.
-        if (!initialRecoveredBuffers.isEmpty()) {
-            final int expectedCount = initialRecoveredBuffers.size();
-            // Sequence number starts at Integer.MIN_VALUE, consistent with 
RecoveredInputChannel.
-            int seqNum = Integer.MIN_VALUE;
-            while (!initialRecoveredBuffers.isEmpty()) {
-                Buffer buffer = initialRecoveredBuffers.poll();
-                // Determine next data type based on the next buffer in the 
queue
-                Buffer.DataType nextDataType =
-                        initialRecoveredBuffers.isEmpty()
-                                ? Buffer.DataType.NONE
-                                : initialRecoveredBuffers.peek().getDataType();
-                // buffersInBacklog is set to 0 as these are recovered buffers
-                BufferAndBacklog bufferAndBacklog =
-                        new BufferAndBacklog(buffer, 0, nextDataType, 
seqNum++);
-                toBeConsumedBuffers.add(bufferAndBacklog);
+        this.channelStatePersister =
+                new ChannelStatePersister(checkNotNull(stateWriter), 
getChannelInfo());
+        this.inRecovery = needsRecovery;
+        this.bufferManager =
+                needsRecovery
+                        // review nit: false for consistency?
+                        ? new 
BufferManager(inputGate.getMemorySegmentProvider(), this, 0, true)
+                        : null;
+        this.networkBuffersPerChannel = networkBuffersPerChannel;
+        this.needsRecovery = needsRecovery;
+    }
+
+    @Override
+    void setup() throws IOException {
+        if (needsRecovery && networkBuffersPerChannel > 0) {
+            bufferManager.requestExclusiveBuffers(networkBuffersPerChannel);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // RecoverableInputChannel implementation
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void onRecoveredStateBuffer(Buffer buffer) {
+        boolean wasEmpty;
+        synchronized (recoveredBuffers) {
+            if (isReleased) {
+                buffer.recycleBuffer();
+                return;
             }
-            checkState(
-                    toBeConsumedBuffers.size() == expectedCount,
-                    "Buffer migration failed: expected %s buffers but got %s",
-                    expectedCount,
-                    toBeConsumedBuffers.size());
+            // Migrate recovered buffers from RecoveredInputChannel. These 
buffers have been
+            // filtered but not yet consumed by the Task.
+            wasEmpty = offerRecoveredBuffer(buffer);
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    @Override
+    public void finishRecoveredBufferDelivery() throws IOException {
+        upstreamReady.join();
+        boolean wasEmpty;
+        synchronized (recoveredBuffers) {
+            checkState(inRecovery, "Recovery delivery already finished.");
+            // Append the sentinel after the last recovered buffer. The 
consume path flips out of
+            // recovery only once it polls this sentinel, guaranteeing all 
recovered buffers are
+            // consumed first.
+            wasEmpty =
+                    offerRecoveredBuffer(
+                            EventSerializer.toBuffer(
+                                    EndOfFetchedChannelStateEvent.INSTANCE, 
false));
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    @Override
+    public Buffer requestRecoveryBufferBlocking() throws InterruptedException, 
IOException {
+        checkState(
+                bufferManager != null,
+                "requestRecoveryBufferBlocking called on a Local channel 
constructed with"
+                        + " needsRecovery=false");
+        upstreamReady.join();
+        return bufferManager.requestBufferBlocking();
+    }
+
+    @Override
+    public void insertRecoveryCheckpointBarrierIfInRecovery(long checkpointId) 
throws IOException {
+        boolean wasEmpty = false;
+        synchronized (recoveredBuffers) {
+            if (!isReleased && inRecovery) {
+                wasEmpty =
+                        offerRecoveredBuffer(
+                                EventSerializer.toBuffer(
+                                        new 
RecoveryCheckpointBarrier(checkpointId), false));
+            }
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    /**
+     * Flips out of recovery the moment the consume path polls the {@code
+     * EndOfFetchedChannelStateEvent} sentinel, i.e. once all recovered 
buffers have been consumed.
+     * Live upstream data may flow again afterwards.
+     */
+    @Override
+    public void onRecoveredStateConsumed() {
+        synchronized (recoveredBuffers) {
+            checkState(inRecovery, "Recovery already finished.");
+            inRecovery = false;
+        }
+        notifyChannelNonEmpty();
+    }
+
+    /**
+     * Appends a recovered buffer (or {@code RecoveryCheckpointBarrier} / 
{@code
+     * EndOfFetchedChannelStateEvent} sentinel) to {@link #recoveredBuffers}.
+     *
+     * @return {@code true} iff {@link #recoveredBuffers} transitioned from 
empty to non-empty.
+     */
+    private boolean offerRecoveredBuffer(Buffer buffer) {
+        assert Thread.holdsLock(recoveredBuffers);
+        checkState(inRecovery, "Push into recovered buffers after recovery 
finished.");
+        boolean wasEmpty = recoveredBuffers.isEmpty();
+        recoveredBuffers.add(buffer);
+        return wasEmpty;
+    }
+
+    private int nextRecoverySequenceNumber() {
+        assert Thread.holdsLock(recoveredBuffers);
+        return recoverySequenceNumber++;
+    }
+
+    /**
+     * Walks {@link #recoveredBuffers} up to the {@link 
RecoveryCheckpointBarrier} sentinel matching
+     * {@code checkpointId}, retaining each pre-barrier recovered data buffer 
and removing the
+     * sentinel.
+     *
+     * @throws IOException if no sentinel matching {@code checkpointId} is 
found (the snapshot
+     *     protocol guarantees one must be present while the channel is in 
recovery).
+     */
+    private List<Buffer> collectPreRecoveryBarrier(long checkpointId)
+            throws IOException, CheckpointException {
+        assert Thread.holdsLock(recoveredBuffers);
+        List<Buffer> retained = new ArrayList<>();
+        try {
+            Iterator<Buffer> it = recoveredBuffers.iterator();
+            while (it.hasNext()) {
+                Buffer b = it.next();
+                if (isRecoveryCheckpointBarrier(b, checkpointId)) {
+                    it.remove();
+                    b.recycleBuffer();
+                    return retained;
+                }
+                if (b.isBuffer()) {
+                    retained.add(b.retainBuffer());
+                }
+            }
+        } catch (IOException e) {
+            releaseRetainedBuffers(retained);
+            throw e;
+        }
+        releaseRetainedBuffers(retained);
+        // The during-recovery sentinel for this checkpoint was never inserted 
into this channel
+        // (the recovery checkpoint trigger had already transitioned away from 
the drainer while
+        // this channel was still in recovery). The channel is simply not 
ready to snapshot
+        // recovered state for this checkpoint yet, so decline as 
TASK_NOT_READY: that is not
+        // counted against the tolerable-failure threshold, so the checkpoint 
is deferred and
+        // retried instead of failing the job. The recovered buffers remain 
queued and are
+        // captured by a later checkpoint, so no in-flight data is lost.
+        throw new CheckpointException(
+                "RecoveryCheckpointBarrier for checkpoint "
+                        + checkpointId
+                        + " not yet present in channel "
+                        + getChannelInfo(),
+                CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);

Review Comment:
   The `CHECKPOINT_DECLINED_TASK_NOT_READY` is used for RecoveredInputChannel 
makes sense because RecoveredInputChannel does not support checkpoint. 
   
   But it is using here has a bit risks, and does not make sense to me for some 
reasons since It will shallow potential bugs



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java:
##########
@@ -113,48 +157,220 @@ public LocalInputChannel(
 
         this.partitionManager = checkNotNull(partitionManager);
         this.taskEventPublisher = checkNotNull(taskEventPublisher);
-        this.channelStatePersister = new ChannelStatePersister(stateWriter, 
getChannelInfo());
-
-        // Migrate recovered buffers from RecoveredInputChannel if provided.
-        // These buffers have been filtered but not yet consumed by the Task.
-        if (!initialRecoveredBuffers.isEmpty()) {
-            final int expectedCount = initialRecoveredBuffers.size();
-            // Sequence number starts at Integer.MIN_VALUE, consistent with 
RecoveredInputChannel.
-            int seqNum = Integer.MIN_VALUE;
-            while (!initialRecoveredBuffers.isEmpty()) {
-                Buffer buffer = initialRecoveredBuffers.poll();
-                // Determine next data type based on the next buffer in the 
queue
-                Buffer.DataType nextDataType =
-                        initialRecoveredBuffers.isEmpty()
-                                ? Buffer.DataType.NONE
-                                : initialRecoveredBuffers.peek().getDataType();
-                // buffersInBacklog is set to 0 as these are recovered buffers
-                BufferAndBacklog bufferAndBacklog =
-                        new BufferAndBacklog(buffer, 0, nextDataType, 
seqNum++);
-                toBeConsumedBuffers.add(bufferAndBacklog);
+        this.channelStatePersister =
+                new ChannelStatePersister(checkNotNull(stateWriter), 
getChannelInfo());
+        this.inRecovery = needsRecovery;
+        this.bufferManager =
+                needsRecovery
+                        // review nit: false for consistency?
+                        ? new 
BufferManager(inputGate.getMemorySegmentProvider(), this, 0, true)
+                        : null;
+        this.networkBuffersPerChannel = networkBuffersPerChannel;
+        this.needsRecovery = needsRecovery;
+    }
+
+    @Override
+    void setup() throws IOException {
+        if (needsRecovery && networkBuffersPerChannel > 0) {
+            bufferManager.requestExclusiveBuffers(networkBuffersPerChannel);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // RecoverableInputChannel implementation
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void onRecoveredStateBuffer(Buffer buffer) {
+        boolean wasEmpty;
+        synchronized (recoveredBuffers) {
+            if (isReleased) {
+                buffer.recycleBuffer();
+                return;
             }
-            checkState(
-                    toBeConsumedBuffers.size() == expectedCount,
-                    "Buffer migration failed: expected %s buffers but got %s",
-                    expectedCount,
-                    toBeConsumedBuffers.size());
+            // Migrate recovered buffers from RecoveredInputChannel. These 
buffers have been
+            // filtered but not yet consumed by the Task.
+            wasEmpty = offerRecoveredBuffer(buffer);
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    @Override
+    public void finishRecoveredBufferDelivery() throws IOException {
+        upstreamReady.join();
+        boolean wasEmpty;
+        synchronized (recoveredBuffers) {
+            checkState(inRecovery, "Recovery delivery already finished.");
+            // Append the sentinel after the last recovered buffer. The 
consume path flips out of
+            // recovery only once it polls this sentinel, guaranteeing all 
recovered buffers are
+            // consumed first.
+            wasEmpty =
+                    offerRecoveredBuffer(
+                            EventSerializer.toBuffer(
+                                    EndOfFetchedChannelStateEvent.INSTANCE, 
false));
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    @Override
+    public Buffer requestRecoveryBufferBlocking() throws InterruptedException, 
IOException {
+        checkState(
+                bufferManager != null,
+                "requestRecoveryBufferBlocking called on a Local channel 
constructed with"
+                        + " needsRecovery=false");
+        upstreamReady.join();
+        return bufferManager.requestBufferBlocking();
+    }
+
+    @Override
+    public void insertRecoveryCheckpointBarrierIfInRecovery(long checkpointId) 
throws IOException {
+        boolean wasEmpty = false;
+        synchronized (recoveredBuffers) {
+            if (!isReleased && inRecovery) {
+                wasEmpty =
+                        offerRecoveredBuffer(
+                                EventSerializer.toBuffer(
+                                        new 
RecoveryCheckpointBarrier(checkpointId), false));
+            }
+        }
+        if (wasEmpty) {
+            notifyChannelNonEmpty();
+        }
+    }
+
+    /**
+     * Flips out of recovery the moment the consume path polls the {@code
+     * EndOfFetchedChannelStateEvent} sentinel, i.e. once all recovered 
buffers have been consumed.
+     * Live upstream data may flow again afterwards.
+     */
+    @Override
+    public void onRecoveredStateConsumed() {
+        synchronized (recoveredBuffers) {
+            checkState(inRecovery, "Recovery already finished.");
+            inRecovery = false;
+        }
+        notifyChannelNonEmpty();
+    }
+
+    /**
+     * Appends a recovered buffer (or {@code RecoveryCheckpointBarrier} / 
{@code
+     * EndOfFetchedChannelStateEvent} sentinel) to {@link #recoveredBuffers}.
+     *
+     * @return {@code true} iff {@link #recoveredBuffers} transitioned from 
empty to non-empty.
+     */
+    private boolean offerRecoveredBuffer(Buffer buffer) {
+        assert Thread.holdsLock(recoveredBuffers);
+        checkState(inRecovery, "Push into recovered buffers after recovery 
finished.");
+        boolean wasEmpty = recoveredBuffers.isEmpty();
+        recoveredBuffers.add(buffer);
+        return wasEmpty;
+    }
+
+    private int nextRecoverySequenceNumber() {
+        assert Thread.holdsLock(recoveredBuffers);
+        return recoverySequenceNumber++;
+    }
+
+    /**
+     * Walks {@link #recoveredBuffers} up to the {@link 
RecoveryCheckpointBarrier} sentinel matching
+     * {@code checkpointId}, retaining each pre-barrier recovered data buffer 
and removing the
+     * sentinel.
+     *
+     * @throws IOException if no sentinel matching {@code checkpointId} is 
found (the snapshot
+     *     protocol guarantees one must be present while the channel is in 
recovery).
+     */
+    private List<Buffer> collectPreRecoveryBarrier(long checkpointId)
+            throws IOException, CheckpointException {
+        assert Thread.holdsLock(recoveredBuffers);
+        List<Buffer> retained = new ArrayList<>();
+        try {
+            Iterator<Buffer> it = recoveredBuffers.iterator();
+            while (it.hasNext()) {
+                Buffer b = it.next();
+                if (isRecoveryCheckpointBarrier(b, checkpointId)) {
+                    it.remove();
+                    b.recycleBuffer();
+                    return retained;
+                }
+                if (b.isBuffer()) {
+                    retained.add(b.retainBuffer());
+                }
+            }
+        } catch (IOException e) {
+            releaseRetainedBuffers(retained);
+            throw e;
+        }
+        releaseRetainedBuffers(retained);
+        // The during-recovery sentinel for this checkpoint was never inserted 
into this channel
+        // (the recovery checkpoint trigger had already transitioned away from 
the drainer while
+        // this channel was still in recovery). The channel is simply not 
ready to snapshot
+        // recovered state for this checkpoint yet, so decline as 
TASK_NOT_READY: that is not
+        // counted against the tolerable-failure threshold, so the checkpoint 
is deferred and
+        // retried instead of failing the job. The recovered buffers remain 
queued and are
+        // captured by a later checkpoint, so no in-flight data is lost.
+        throw new CheckpointException(
+                "RecoveryCheckpointBarrier for checkpoint "
+                        + checkpointId
+                        + " not yet present in channel "
+                        + getChannelInfo(),
+                CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);

Review Comment:
   IIUC, it never happens, related login in 
`ChannelState#onCheckpointStartedForAllInputs`.
   
   - if the task is not ready for checkpoint, the 
`recoveryCheckpointTrigger.snapshotAndInsertBarriers(cpId);` will throws `new 
IllegalStateException("RecoveryCheckpointTrigger is not ready yet");`
   - Then `checkpointStarted` won't be called
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to