This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03b2e704e3417cefadbc62a61a81efa685e24bea
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 23 17:54:34 2020 +0100

    [FLINK-20222][checkpointing] Operator Coordinators are reset with null 
state when no checkpoint or state available.
    
    This includes the following cases:
      - JobManager/Scheduler/Coordinator failover happens before the first 
checkpoint completed.
      - Coordinator has not stored any state in the checkpoint.
    
    For both cases, this is useful to signal to the coordinators that a 
reset-to-checkpoint happened, even if
    they do not have checkpointed state to restore.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 82 +++++++++++++++++-----
 .../OperatorCoordinatorCheckpointContext.java      | 15 +++-
 .../coordination/OperatorCoordinator.java          | 10 ++-
 .../coordination/OperatorCoordinatorHolder.java    |  2 +-
 .../RecreateOnResetOperatorCoordinator.java        |  4 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  5 +-
 .../source/coordinator/SourceCoordinator.java      |  9 ++-
 .../OperatorCoordinatorSchedulerTest.java          | 12 ++++
 .../coordination/TestingOperatorCoordinator.java   |  8 ++-
 .../collect/CollectSinkOperatorCoordinator.java    | 13 ++--
 10 files changed, 129 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0e61a11..61d6f22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1226,7 +1226,11 @@ public class CheckpointCoordinator {
                        boolean errorIfNoCheckpoint,
                        boolean allowNonRestoredState) throws Exception {
 
-               return restoreLatestCheckpointedStateInternal(new 
HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
+               return restoreLatestCheckpointedStateInternal(
+                               new HashSet<>(tasks.values()),
+                               
OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET,
+                               errorIfNoCheckpoint,
+                               allowNonRestoredState);
        }
 
        /**
@@ -1257,7 +1261,11 @@ public class CheckpointCoordinator {
                //     of the restarted region), meaning there will be 
unmatched state by design.
                //   - because what we might end up restoring from an original 
savepoint with unmatched
                //     state, if there is was no checkpoint yet.
-               return restoreLatestCheckpointedStateInternal(tasks, false, 
false, true);
+               return restoreLatestCheckpointedStateInternal(
+                               tasks,
+                               OperatorCoordinatorRestoreBehavior.SKIP, // 
local/regional recovery does not reset coordinators
+                               false,         // recovery might come before 
first successful checkpoint
+                               true);       // see explanation above
        }
 
        /**
@@ -1287,12 +1295,34 @@ public class CheckpointCoordinator {
                        final Set<ExecutionJobVertex> tasks,
                        final boolean allowNonRestoredState) throws Exception {
 
-               return restoreLatestCheckpointedStateInternal(tasks, true, 
false, allowNonRestoredState);
+               return restoreLatestCheckpointedStateInternal(
+                               tasks,
+                               
OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, // global recovery 
restores coordinators, or resets them to empty
+                               false,   // recovery might come before first 
successful checkpoint
+                               allowNonRestoredState);
+       }
+
+       /**
+        * Restores the latest checkpointed at the beginning of the job 
execution.
+        * If there is a checkpoint, this method acts like a "global 
restore"-style
+        * operation where all stateful tasks and coordinators from the given
+        * set of Job Vertices are restored.
+        *
+        * @param tasks Set of job vertices to restore. State for these 
vertices is
+        *              restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
+        * @return True, if a checkpoint was found and its state was restored, 
false otherwise.
+        */
+       public boolean restoreInitialCheckpointIfPresent(final 
Set<ExecutionJobVertex> tasks) throws Exception {
+               return restoreLatestCheckpointedStateInternal(
+                       tasks,
+                       
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
+                       false,    // initial checkpoints exist only on 
JobManager failover. ok if not present.
+                       false); // JobManager failover means JobGraphs match 
exactly.
        }
 
        private boolean restoreLatestCheckpointedStateInternal(
                final Set<ExecutionJobVertex> tasks,
-               final boolean restoreCoordinators,
+               final OperatorCoordinatorRestoreBehavior 
operatorCoordinatorRestoreBehavior,
                final boolean errorIfNoCheckpoint,
                final boolean allowNonRestoredState) throws Exception {
 
@@ -1321,14 +1351,23 @@ public class CheckpointCoordinator {
                        CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
 
                        if (latest == null) {
+                               LOG.info("No checkpoint found during restore.");
+
                                if (errorIfNoCheckpoint) {
                                        throw new IllegalStateException("No 
completed checkpoint available");
-                               } else {
+                               }
+
+                               if (operatorCoordinatorRestoreBehavior == 
OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET) {
+                                       // we let the JobManager-side 
components know that there was a recovery,
+                                       // even if there was no checkpoint to 
recover from, yet
                                        LOG.debug("Resetting the master 
hooks.");
                                        MasterHooks.reset(masterHooks.values(), 
LOG);
 
-                                       return false;
+                                       LOG.info("Resetting the Coordinators to 
an empty state.");
+                                       
restoreStateToCoordinators(Collections.emptyMap());
                                }
+
+                               return false;
                        }
 
                        LOG.info("Restoring job {} from latest valid 
checkpoint: {}.", job, latest);
@@ -1352,7 +1391,7 @@ public class CheckpointCoordinator {
                                        allowNonRestoredState,
                                        LOG);
 
-                       if (restoreCoordinators) {
+                       if (operatorCoordinatorRestoreBehavior != 
OperatorCoordinatorRestoreBehavior.SKIP) {
                                restoreStateToCoordinators(operatorStates);
                        }
 
@@ -1410,7 +1449,11 @@ public class CheckpointCoordinator {
 
                LOG.info("Reset the checkpoint ID of job {} to {}.", job, 
nextCheckpointId);
 
-               return restoreLatestCheckpointedStateInternal(new 
HashSet<>(tasks.values()), true, true, allowNonRestored);
+               return restoreLatestCheckpointedStateInternal(
+                               new HashSet<>(tasks.values()),
+                               
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
+                               true,
+                               allowNonRestored);
        }
 
        // 
------------------------------------------------------------------------
@@ -1567,14 +1610,9 @@ public class CheckpointCoordinator {
        private void restoreStateToCoordinators(final Map<OperatorID, 
OperatorState> operatorStates) throws Exception {
                for (OperatorCoordinatorCheckpointContext coordContext : 
coordinatorsToCheckpoint) {
                        final OperatorState state = 
operatorStates.get(coordContext.operatorId());
-                       if (state == null) {
-                               continue;
-                       }
-
-                       final ByteStreamStateHandle coordinatorState = 
state.getCoordinatorState();
-                       if (coordinatorState != null) {
-                               
coordContext.resetToCheckpoint(coordinatorState.getData());
-                       }
+                       final ByteStreamStateHandle coordinatorState = state == 
null ? null : state.getCoordinatorState();
+                       final byte[] bytes = coordinatorState == null ? null : 
coordinatorState.getData();
+                       coordContext.resetToCheckpoint(bytes);
                }
        }
 
@@ -1848,4 +1886,16 @@ public class CheckpointCoordinator {
                        return props.forceCheckpoint();
                }
        }
+
+       private enum OperatorCoordinatorRestoreBehavior {
+
+               /** Coordinators are always restored. If there is no 
checkpoint, they are restored empty. */
+               RESTORE_OR_RESET,
+
+               /** Coordinators are restored if there was a checkpoint. */
+               RESTORE_IF_CHECKPOINT_PRESENT,
+
+               /** Coordinators are not restored during this checkpoint 
restore. */
+               SKIP;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index f5f72d8..2f14e58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointListener;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -52,5 +54,16 @@ public interface OperatorCoordinatorCheckpointContext 
extends OperatorInfo, Chec
        @Override
        default void notifyCheckpointAborted(long checkpointId) {}
 
-       void resetToCheckpoint(byte[] checkpointData) throws Exception;
+       /**
+        * Resets the coordinator to the checkpoint with the given state.
+        *
+        * <p>This method is called with a null state argument in the following 
situations:
+        * <ul>
+        *   <li>There is a recovery and there was no completed checkpoint 
yet.</li>
+        *   <li>There is a recovery from a completed checkpoint/savepoint but 
it contained no state
+        *       for the coordinator.</li>
+        * </ul>
+        * In both cases, the coordinator should reset to an empty (new) state.
+        */
+       void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index 21b67f2..1a6a640 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -127,6 +127,14 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * to {@code Context} methods. For example, Events being sent by the 
Coordinator after this method
         * returns are assumed to take place after the checkpoint that was 
restored.
         *
+        * <p>This method is called with a null state argument in the following 
situations:
+        * <ul>
+        *   <li>There is a recovery and there was no completed checkpoint 
yet.</li>
+        *   <li>There is a recovery from a completed checkpoint/savepoint but 
it contained no state
+        *       for the coordinator.</li>
+        * </ul>
+        * In both cases, the coordinator should reset to an empty (new) state.
+        *
         * <h2>Restoring implicitly notifies of Checkpoint Completion</h2>
         *
         * <p>Restoring to a checkpoint is a way of confirming that the 
checkpoint is complete.
@@ -137,7 +145,7 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * complete (for example when a system failure happened directly after 
committing the checkpoint,
         * before calling the {@link #notifyCheckpointComplete(long)} method).
         */
-       void resetToCheckpoint(byte[] checkpointData) throws Exception;
+       void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception;
 
        // 
------------------------------------------------------------------------
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index e5844ef..4c7f252 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -229,7 +229,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
                // ideally we would like to check this here, however this 
method is called early during
                // execution graph construction, before the main thread 
executor is set
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index ae67868..52b30ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+       public void resetToCheckpoint(@Nullable byte[] checkpointData) {
                // First bump up the coordinator epoch to fence out the active 
coordinator.
                LOG.info("Resetting coordinator to checkpoint.");
                // Replace the coordinator variable with a new 
DeferrableCoordinator instance.
@@ -366,7 +366,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
                        internalCoordinator.start();
                }
 
-               void resetAndStart(byte[] checkpointData, boolean started) {
+               void resetAndStart(@Nullable byte[] checkpointData, boolean 
started) {
                        if (failed || closed || internalCoordinator == null) {
                                return;
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 42d3e02..f65f3c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -245,9 +245,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
                if (checkpointCoordinator != null) {
                        // check whether we find a valid checkpoint
-                       if 
(!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
-                               new 
HashSet<>(newExecutionGraph.getAllVertices().values()),
-                               false)) {
+                       if 
(!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                                       new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
 
                                // check whether we can restore from a savepoint
                                
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 07ebbdb..3dea157 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -223,10 +223,17 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
                checkState(!started, "The coordinator can only be reset if it 
was not yet started");
                assert enumerator == null;
 
+               // the checkpoint data is null if there was no completed 
checkpoint before
+               // in that case we don't restore here, but let a fresh 
SplitEnumerator be created
+               // when "start()" is called.
+               if (checkpointData == null) {
+                       return;
+               }
+
                LOG.info("Restoring SplitEnumerator of source {} from 
checkpoint.", operatorName);
 
                final ClassLoader userCodeClassLoader = 
context.getCoordinatorContext().getUserCodeClassloader();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 353b6b9..024cd46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -78,6 +78,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -322,6 +323,17 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
        }
 
        @Test
+       public void testGlobalFailureBeforeCheckpointResetsToEmptyState() 
throws Exception {
+               final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failGlobalAndRestart(scheduler, new TestException());
+
+               assertSame("coordinator should have null restored state",
+                       TestingOperatorCoordinator.NULL_RESTORE_VALUE, 
coordinator.getLastRestoredCheckpointState());
+       }
+
+       @Test
        public void testLocalFailureDoesNotResetToCheckpoint() throws Exception 
{
                final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
                final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 5c5b91f..9572601 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -35,6 +35,8 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 class TestingOperatorCoordinator implements OperatorCoordinator {
 
+       public static final byte[] NULL_RESTORE_VALUE = new byte[0];
+
        private final OperatorCoordinator.Context context;
 
        private final ArrayList<Integer> failedTasks = new ArrayList<>();
@@ -104,8 +106,10 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) {
-               lastRestoredCheckpointState = checkpointData;
+       public void resetToCheckpoint(@Nullable byte[] checkpointData) {
+               lastRestoredCheckpointState = checkpointData == null
+                               ? NULL_RESTORE_VALUE
+                               : checkpointData;
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 1423415..5454412 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -203,10 +203,15 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) throws Exception {
-               ByteArrayInputStream bais = new 
ByteArrayInputStream(checkpointData);
-               ObjectInputStream ois = new ObjectInputStream(bais);
-               address = (InetSocketAddress) ois.readObject();
+       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
+               if (checkpointData == null) {
+                       // restore before any checkpoint completed
+                       closeConnection();
+               } else {
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(checkpointData);
+                       ObjectInputStream ois = new ObjectInputStream(bais);
+                       address = (InetSocketAddress) ois.readObject();
+               }
        }
 
        /**

Reply via email to