Repository: flink
Updated Branches:
  refs/heads/master 8d180d5fa -> 56c756040


[hotfix] Improved logging for task local recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56c75604
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56c75604
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56c75604

Branch: refs/heads/master
Commit: 56c756040fc4b3f224c4e7c12d208a3ccf5a7c5e
Parents: 8d180d5
Author: Stefan Richter <[email protected]>
Authored: Mon Feb 26 18:03:14 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Tue Feb 27 15:28:11 2018 +0100

----------------------------------------------------------------------
 .../PrioritizedOperatorSubtaskState.java        | 31 +++++----
 .../TaskExecutorLocalStateStoresManager.java    | 68 ++++++++++++++------
 .../runtime/state/TaskLocalStateStoreImpl.java  | 63 ++++++++++++------
 .../runtime/state/TaskStateManagerImpl.java     | 27 +++++---
 .../PrioritizedOperatorSubtaskStateTest.java    |  4 +-
 .../runtime/state/TaskStateManagerImplTest.java |  4 +-
 .../api/operators/BackendRestorerProcedure.java | 61 ++++++++++++------
 .../StreamTaskStateInitializerImpl.java         | 20 ++++--
 .../operators/BackendRestorerProcedureTest.java | 18 ++----
 9 files changed, 193 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
index f48d311..512f912 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
@@ -27,7 +27,6 @@ import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.function.BiFunction;
 
@@ -79,39 +78,39 @@ public class PrioritizedOperatorSubtaskState {
        // 
-----------------------------------------------------------------------------------------------------------------
 
        /**
-        * Returns an iterator over all alternative snapshots to restore the 
managed operator state, in the order in which
-        * we should attempt to restore.
+        * Returns an immutable list with all alternative snapshots to restore 
the managed operator state, in the order in
+        * which we should attempt to restore.
         */
        @Nonnull
-       public Iterator<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedManagedOperatorState() {
-               return prioritizedManagedOperatorState.iterator();
+       public List<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedManagedOperatorState() {
+               return prioritizedManagedOperatorState;
        }
 
        /**
-        * Returns an iterator over all alternative snapshots to restore the 
raw operator state, in the order in which we
-        * should attempt to restore.
+        * Returns an immutable list with all alternative snapshots to restore 
the raw operator state, in the order in
+        * which we should attempt to restore.
         */
        @Nonnull
-       public Iterator<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedRawOperatorState() {
-               return prioritizedRawOperatorState.iterator();
+       public List<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedRawOperatorState() {
+               return prioritizedRawOperatorState;
        }
 
        /**
-        * Returns an iterator over all alternative snapshots to restore the 
managed keyed state, in the order in which we
-        * should attempt to restore.
+        * Returns an immutable list with all alternative snapshots to restore 
the managed keyed state, in the order in
+        * which we should attempt to restore.
         */
        @Nonnull
-       public Iterator<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedManagedKeyedState() {
-               return prioritizedManagedKeyedState.iterator();
+       public List<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedManagedKeyedState() {
+               return prioritizedManagedKeyedState;
        }
 
        /**
-        * Returns an iterator over all alternative snapshots to restore the 
raw keyed state, in the order in which we
+        * Returns an immutable list with all alternative snapshots to restore 
the raw keyed state, in the order in which we
         * should attempt to restore.
         */
        @Nonnull
-       public Iterator<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedRawKeyedState() {
-               return prioritizedRawKeyedState.iterator();
+       public List<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedRawKeyedState() {
+               return prioritizedRawKeyedState;
        }
 
        // 
-----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index a940aef..e7a7d8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -110,38 +110,68 @@ public class TaskExecutorLocalStateStoresManager {
                                        "register a new TaskLocalStateStore.");
                        }
 
-                       final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
taskStateManagers =
-                               
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new 
HashMap<>());
+                       Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
taskStateManagers =
+                               
this.taskStateStoresByAllocationID.get(allocationID);
+
+                       if (taskStateManagers == null) {
+                               taskStateManagers = new HashMap<>();
+                               
this.taskStateStoresByAllocationID.put(allocationID, taskStateManagers);
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Registered new allocation id 
{} for local state stores for job {}.",
+                                               allocationID, jobId);
+                               }
+                       }
 
                        final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
 
-                       // create the allocation base dirs, one inside each 
root dir.
-                       File[] allocationBaseDirectories = 
allocationBaseDirectories(allocationID);
+                       TaskLocalStateStoreImpl taskLocalStateStore = 
taskStateManagers.get(taskKey);
 
-                       LocalRecoveryDirectoryProviderImpl directoryProvider = 
new LocalRecoveryDirectoryProviderImpl(
-                               allocationBaseDirectories,
-                               jobId,
-                               jobVertexID,
-                               subtaskIndex);
+                       if (taskLocalStateStore == null) {
+
+                               // create the allocation base dirs, one inside 
each root dir.
+                               File[] allocationBaseDirectories = 
allocationBaseDirectories(allocationID);
+
+                               LocalRecoveryDirectoryProviderImpl 
directoryProvider = new LocalRecoveryDirectoryProviderImpl(
+                                       allocationBaseDirectories,
+                                       jobId,
+                                       jobVertexID,
+                                       subtaskIndex);
 
-                       LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(
-                               localRecoveryMode,
-                               directoryProvider);
+                               LocalRecoveryConfig localRecoveryConfig =
+                                       new 
LocalRecoveryConfig(localRecoveryMode, directoryProvider);
 
-                       return taskStateManagers.computeIfAbsent(
-                               taskKey,
-                               k -> new TaskLocalStateStoreImpl(
+                               taskLocalStateStore = new 
TaskLocalStateStoreImpl(
                                        jobId,
                                        allocationID,
                                        jobVertexID,
                                        subtaskIndex,
                                        localRecoveryConfig,
-                                       discardExecutor));
+                                       discardExecutor);
+
+                               taskStateManagers.put(taskKey, 
taskLocalStateStore);
+
+                               if (LOG.isTraceEnabled()) {
+                                       LOG.trace("Registered new local state 
store with configuration {} for {} - {} - {} under allocation id {}.",
+                                               localRecoveryConfig, jobId, 
jobVertexID, subtaskIndex, allocationID);
+                               }
+                       } else {
+                               if (LOG.isTraceEnabled()) {
+                                       LOG.trace("Found existing local state 
store for {} - {} - {} under allocation id {}.",
+                                               jobId, jobVertexID, 
subtaskIndex, allocationID);
+                               }
+                       }
+
+                       return taskLocalStateStore;
                }
        }
 
        public void releaseLocalStateForAllocationId(@Nonnull AllocationID 
allocationID) {
 
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Releasing local state under allocation id 
{}.", allocationID);
+               }
+
                Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
cleanupLocalStores;
 
                synchronized (lock) {
@@ -175,7 +205,7 @@ public class TaskExecutorLocalStateStoresManager {
 
                ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
-               LOG.debug("Shutting down TaskExecutorLocalStateStoresManager.");
+               LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
 
                for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> entry :
                        toRelease.entrySet()) {
@@ -217,7 +247,7 @@ public class TaskExecutorLocalStateStoresManager {
                                try {
                                        stateStore.dispose();
                                } catch (Exception disposeEx) {
-                                       LOG.warn("Exception while disposing 
local state store " + stateStore, disposeEx);
+                                       LOG.warn("Exception while disposing 
local state store {}.", stateStore, disposeEx);
                                }
                        }
                }
@@ -233,7 +263,7 @@ public class TaskExecutorLocalStateStoresManager {
                        try {
                                FileUtils.deleteFileOrDirectory(directory);
                        } catch (IOException e) {
-                               LOG.warn("Exception while deleting local state 
directory for allocation " + allocationID, e);
+                               LOG.warn("Exception while deleting local state 
directory for allocation id {}.", allocationID, e);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 191c109..bb4f011 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -103,15 +103,15 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
                @Nonnull LocalRecoveryConfig localRecoveryConfig,
                @Nonnull Executor discardExecutor) {
 
+               this.lock = new Object();
+               this.storedTaskStateByCheckpointID = new TreeMap<>();
                this.jobID = jobID;
                this.allocationID = allocationID;
                this.jobVertexID = jobVertexID;
                this.subtaskIndex = subtaskIndex;
                this.discardExecutor = discardExecutor;
-               this.lock = new Object();
-               this.storedTaskStateByCheckpointID = new TreeMap<>();
-               this.disposed = false;
                this.localRecoveryConfig = localRecoveryConfig;
+               this.disposed = false;
        }
 
        @Override
@@ -123,8 +123,15 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
                        localState = NULL_DUMMY;
                }
 
-               LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
-               LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
+               if (LOG.isTraceEnabled()) {
+                       LOG.debug(
+                               "Stored local state for checkpoint {} in 
subtask ({} - {} - {}) : {}.",
+                               checkpointId, jobID, jobVertexID, subtaskIndex, 
localState);
+               } else if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Stored local state for checkpoint {} in 
subtask ({} - {} - {})",
+                               checkpointId, jobID, jobVertexID, subtaskIndex);
+               }
 
                Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
 
@@ -148,10 +155,21 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
        @Override
        @Nullable
        public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+
+               TaskStateSnapshot snapshot;
                synchronized (lock) {
-                       TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
-                       return snapshot != NULL_DUMMY ? snapshot : null;
+                       snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
                }
+
+               if (LOG.isTraceEnabled()) {
+                       LOG.trace("Found entry for local state for checkpoint 
{} in subtask ({} - {} - {}) : {}",
+                               checkpointID, jobID, jobVertexID, subtaskIndex, 
snapshot);
+               } else if (LOG.isDebugEnabled()) {
+                       LOG.debug("Found entry for local state for checkpoint 
{} in subtask ({} - {} - {})",
+                               checkpointID, jobID, jobVertexID, subtaskIndex);
+               }
+
+               return snapshot != NULL_DUMMY ? snapshot : null;
        }
 
        @Override
@@ -163,7 +181,8 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
        @Override
        public void confirmCheckpoint(long confirmedCheckpointId) {
 
-               LOG.debug("Received confirmation for checkpoint {}. Starting to 
prune history.", confirmedCheckpointId);
+               LOG.debug("Received confirmation for checkpoint {} in subtask 
({} - {} - {}). Starting to prune history.",
+                       confirmedCheckpointId, jobID, jobVertexID, 
subtaskIndex);
 
                final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new 
ArrayList<>();
 
@@ -216,7 +235,8 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
                                        try {
                                                
deleteDirectory(subtaskBaseDirectory);
                                        } catch (IOException e) {
-                                               LOG.warn("Exception when 
deleting local recovery subtask base dir: " + subtaskBaseDirectory, e);
+                                               LOG.warn("Exception when 
deleting local recovery subtask base directory {} in subtask ({} - {} - {})",
+                                                       subtaskBaseDirectory, 
jobID, jobVertexID, subtaskIndex, e);
                                        }
                                }
                        },
@@ -240,27 +260,32 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
         */
        private void discardLocalStateForCheckpoint(long checkpointID, 
TaskStateSnapshot o) {
 
+               if (LOG.isTraceEnabled()) {
+                       LOG.trace("Discarding local task state snapshot of 
checkpoint {} for subtask ({} - {} - {}).",
+                               checkpointID, jobID, jobVertexID, subtaskIndex);
+               } else {
+                       LOG.debug("Discarding local task state snapshot {} of 
checkpoint {} for subtask ({} - {} - {}).",
+                               o, checkpointID, jobID, jobVertexID, 
subtaskIndex);
+               }
+
                try {
-                       if (LOG.isTraceEnabled()) {
-                               LOG.trace("Discarding local task state snapshot 
of checkpoint {} for {}/{}/{}.",
-                                       checkpointID, jobID, jobVertexID, 
subtaskIndex);
-                       } else {
-                               LOG.debug("Discarding local task state snapshot 
{} of checkpoint {} for {}/{}/{}.",
-                                       o, checkpointID, jobID, jobVertexID, 
subtaskIndex);
-                       }
                        o.discardState();
                } catch (Exception discardEx) {
-                       LOG.warn("Exception while discarding local task state 
snapshot of checkpoint " + checkpointID + ".", discardEx);
+                       LOG.warn("Exception while discarding local task state 
snapshot of checkpoint {} in subtask ({} - {} - {}).",
+                               checkpointID, jobID, jobVertexID, subtaskIndex, 
discardEx);
                }
 
                LocalRecoveryDirectoryProvider directoryProvider = 
localRecoveryConfig.getLocalStateDirectoryProvider();
                File checkpointDir = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID);
-               LOG.debug("Deleting local state directory {} of checkpoint {} 
for {}/{}/{}/{}.",
+
+               LOG.debug("Deleting local state directory {} of checkpoint {} 
for subtask ({} - {} - {}).",
                        checkpointDir, checkpointID, jobID, jobVertexID, 
subtaskIndex);
+
                try {
                        deleteDirectory(checkpointDir);
                } catch (IOException ex) {
-                       LOG.warn("Exception while deleting local state 
directory of checkpoint " + checkpointID + ".", ex);
+                       LOG.warn("Exception while deleting local state 
directory of checkpoint {} in subtask ({} - {} - {}).",
+                               checkpointID, jobID, jobVertexID, subtaskIndex, 
ex);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 3acca7c..e057d19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -29,10 +29,14 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.List;
 
 /**
  * This class is the default implementation of {@link TaskStateManager} and 
collaborates with the job manager
@@ -40,13 +44,14 @@ import java.util.Collections;
  * not have to deal with the differences between remote or local state on 
recovery because this class handles both
  * cases transparently.
  *
- * Reported state is tagged by clients so that this class can properly forward 
to the right receiver for the
+ * <p>Reported state is tagged by clients so that this class can properly 
forward to the right receiver for the
  * checkpointed state.
- *
- * TODO: all interaction with local state store must still be implemented! It 
is currently just a placeholder.
  */
 public class TaskStateManagerImpl implements TaskStateManager {
 
+       /** The logger for this class. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskStateManagerImpl.class);
+
        /** The id of the job for which this manager was created, can report, 
and recover. */
        private final JobID jobId;
 
@@ -117,21 +122,27 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
                TaskStateSnapshot localStateSnapshot =
                        
localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId());
 
+               List<OperatorSubtaskState> alternativesByPriority = 
Collections.emptyList();
+
                if (localStateSnapshot != null) {
                        OperatorSubtaskState localSubtaskState = 
localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
 
                        if (localSubtaskState != null) {
-                               PrioritizedOperatorSubtaskState.Builder builder 
= new PrioritizedOperatorSubtaskState.Builder(
-                                       jobManagerSubtaskState,
-                                       
Collections.singletonList(localSubtaskState));
-                               return builder.build();
+                               alternativesByPriority = 
Collections.singletonList(localSubtaskState);
                        }
                }
 
+               if (LOG.isTraceEnabled()) {
+                       LOG.trace("Operator {} has remote state {} from job 
manager and local state alternatives {} from local " +
+                                       "state store {}.",
+                               operatorID, jobManagerSubtaskState, 
alternativesByPriority, localStateStore);
+               }
+
                PrioritizedOperatorSubtaskState.Builder builder = new 
PrioritizedOperatorSubtaskState.Builder(
                        jobManagerSubtaskState,
-                       Collections.emptyList(),
+                       alternativesByPriority,
                        true);
+
                return builder.build();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java
index 09c9efb..82082e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java
@@ -216,7 +216,7 @@ public class PrioritizedOperatorSubtaskStateTest extends 
TestLogger {
 
        private <T extends StateObject> boolean checkResultAsExpected(
                Function<OperatorSubtaskState, StateObjectCollection<T>> 
extractor,
-               Function<PrioritizedOperatorSubtaskState, 
Iterator<StateObjectCollection<T>>> extractor2,
+               Function<PrioritizedOperatorSubtaskState, 
List<StateObjectCollection<T>>> extractor2,
                PrioritizedOperatorSubtaskState prioritizedResult,
                OperatorSubtaskState... expectedOrdered) {
 
@@ -226,7 +226,7 @@ public class PrioritizedOperatorSubtaskStateTest extends 
TestLogger {
                }
 
                return checkRepresentSameOrder(
-                       extractor2.apply(prioritizedResult),
+                       extractor2.apply(prioritizedResult).iterator(),
                        collector.toArray(new 
StateObjectCollection[collector.size()]));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 926c196..f58f3f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -142,7 +142,7 @@ public class TaskStateManagerImplTest extends TestLogger {
 
                // checks for operator 1.
                Iterator<StateObjectCollection<KeyedStateHandle>> 
prioritizedManagedKeyedState_1 =
-                       prioritized_1.getPrioritizedManagedKeyedState();
+                       
prioritized_1.getPrioritizedManagedKeyedState().iterator();
 
                Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext());
                StateObjectCollection<KeyedStateHandle> current = 
prioritizedManagedKeyedState_1.next();
@@ -158,7 +158,7 @@ public class TaskStateManagerImplTest extends TestLogger {
 
                // checks for operator 2.
                Iterator<StateObjectCollection<KeyedStateHandle>> 
prioritizedRawKeyedState_2 =
-                       prioritized_2.getPrioritizedRawKeyedState();
+                       prioritized_2.getPrioritizedRawKeyedState().iterator();
 
                Assert.assertTrue(prioritizedRawKeyedState_2.hasNext());
                current = prioritizedRawKeyedState_2.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index ba27a0a..dd75fb2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -36,7 +36,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.List;
 
 /**
  * This class implements the logic that creates (and potentially restores) a 
state backend. The restore logic
@@ -62,6 +62,9 @@ public class BackendRestorerProcedure<
        /** This registry is used so that recovery can participate in the task 
lifecycle, i.e. can be canceled. */
        private final CloseableRegistry backendCloseableRegistry;
 
+       /** Description of this instance for logging. */
+       private final String logDescription;
+
        /**
         * Creates a new backend restorer using the given backend supplier and 
the closeable registry.
         *
@@ -70,43 +73,63 @@ public class BackendRestorerProcedure<
         */
        public BackendRestorerProcedure(
                @Nonnull SupplierWithException<T, Exception> instanceSupplier,
-               @Nonnull CloseableRegistry backendCloseableRegistry) {
+               @Nonnull CloseableRegistry backendCloseableRegistry,
+               @Nonnull String logDescription) {
 
                this.instanceSupplier = 
Preconditions.checkNotNull(instanceSupplier);
                this.backendCloseableRegistry = 
Preconditions.checkNotNull(backendCloseableRegistry);
+               this.logDescription = logDescription;
        }
 
        /**
         * Creates a new state backend and restores it from the provided set of 
state snapshot alternatives.
         *
-        * @param restoreOptions iterator over a prioritized set of state 
snapshot alternatives for recovery.
+        * @param restoreOptions list of prioritized state snapshot 
alternatives for recovery.
         * @return the created (and restored) state backend.
         * @throws Exception if the backend could not be created or restored.
         */
-       public @Nonnull
-       T createAndRestore(@Nonnull Iterator<? extends Collection<S>> 
restoreOptions) throws Exception {
+       @Nonnull
+       public T createAndRestore(@Nonnull List<? extends Collection<S>> 
restoreOptions) throws Exception {
+
+               if (restoreOptions.isEmpty()) {
+                       restoreOptions = 
Collections.singletonList(Collections.emptyList());
+               }
+
+               int alternativeIdx = 0;
+
+               Exception collectedException = null;
+
+               while (alternativeIdx < restoreOptions.size()) {
+
+                       Collection<S> restoreState = 
restoreOptions.get(alternativeIdx);
 
-               // This ensures that we always call the restore method even if 
there is no previous state
-               // (required by some backends).
-               Collection<S> attemptState = restoreOptions.hasNext() ?
-                       restoreOptions.next() :
-                       Collections.emptyList();
+                       ++alternativeIdx;
+
+                       if (restoreState.isEmpty()) {
+                               LOG.debug("Creating {} with empty state.", 
logDescription);
+                       } else {
+                               if (LOG.isTraceEnabled()) {
+                                       LOG.trace("Creating {} and restoring 
with state {} from alternative ({}/{}).",
+                                               logDescription, restoreState, 
alternativeIdx, restoreOptions.size());
+                               } else {
+                                       LOG.debug("Creating {} and restoring 
with state from alternative ({}/{}).",
+                                               logDescription, alternativeIdx, 
restoreOptions.size());
+                               }
+                       }
 
-               while (true) {
                        try {
-                               return attemptCreateAndRestore(attemptState);
+                               return attemptCreateAndRestore(restoreState);
                        } catch (Exception ex) {
-                               // more attempts?
-                               if (restoreOptions.hasNext()) {
 
-                                       attemptState = restoreOptions.next();
-                                       LOG.warn("Exception while restoring 
backend, will retry with another snapshot replica.", ex);
-                               } else {
+                               collectedException = 
ExceptionUtils.firstOrSuppressed(ex, collectedException);
 
-                                       throw new FlinkException("Could not 
restore from any of the provided restore options.", ex);
-                               }
+                               LOG.warn("Exception while restoring {} from 
alternative ({}/{}), will retry while more " +
+                                       "alternatives are available.", 
logDescription, alternativeIdx, restoreOptions.size(), ex);
                        }
                }
+
+               throw new FlinkException("Could not restore " + logDescription 
+ " from any of the " + restoreOptions.size() +
+                       " provided restore options.", collectedException);
        }
 
        private T attemptCreateAndRestore(Collection<S> restoreState) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 11e2dda..acbc2f8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -143,11 +143,11 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
                        // -------------- Raw State Streams --------------
                        rawKeyedStateInputs = rawKeyedStateInputs(
-                               
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState());
+                               
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
                        
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
 
                        rawOperatorStateInputs = rawOperatorStateInputs(
-                               
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState());
+                               
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
                        
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
 
                        // -------------- Internal Timer Service Manager 
--------------
@@ -226,12 +226,16 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
 
+               String logDescription = "operator state backend for " + 
operatorIdentifierText;
+
                BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> backendRestorer =
                        new BackendRestorerProcedure<>(
                                () -> 
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
-                               backendCloseableRegistry);
+                               backendCloseableRegistry,
+                               logDescription);
 
-               return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
+               return backendRestorer.createAndRestore(
+                       
prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
        }
 
        protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
@@ -244,6 +248,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        return null;
                }
 
+               String logDescription = "keyed state backend for " + 
operatorIdentifierText;
+
                TaskInfo taskInfo = environment.getTaskInfo();
 
                final KeyGroupRange keyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
@@ -261,9 +267,11 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                                        
taskInfo.getMaxNumberOfParallelSubtasks(),
                                        keyGroupRange,
                                        environment.getTaskKvStateRegistry()),
-                                       backendCloseableRegistry);
+                               backendCloseableRegistry,
+                               logDescription);
 
-               return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
+               return backendRestorer.createAndRestore(
+                       
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
        }
 
        protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(

http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
index 2126f70..0f15d11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
@@ -104,21 +104,17 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                        new 
StateObjectCollection<>(Collections.singletonList(firstFailHandle)),
                        new 
StateObjectCollection<>(Collections.singletonList(secondSuccessHandle)),
                        new 
StateObjectCollection<>(Collections.singletonList(thirdNotUsedHandle)));
-               Iterator<StateObjectCollection<OperatorStateHandle>> iterator = 
sortedRestoreOptions.iterator();
 
                BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
-                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry, "test op state backend");
 
-               OperatorStateBackend restoredBackend = 
restorerProcedure.createAndRestore(iterator);
+               OperatorStateBackend restoredBackend = 
restorerProcedure.createAndRestore(sortedRestoreOptions);
                Assert.assertNotNull(restoredBackend);
 
                try {
-                       Assert.assertTrue(iterator.hasNext());
-                       Assert.assertTrue(thirdNotUsedHandle == 
iterator.next().iterator().next());
                        verify(firstFailHandle).openInputStream();
                        verify(secondSuccessHandle).openInputStream();
                        verifyZeroInteractions(thirdNotUsedHandle);
-                       Assert.assertFalse(iterator.hasNext());
 
                        ListState<Integer> listState = 
restoredBackend.getListState(stateDescriptor);
 
@@ -151,13 +147,12 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                        new 
StateObjectCollection<>(Collections.singletonList(firstFailHandle)),
                        new 
StateObjectCollection<>(Collections.singletonList(secondFailHandle)),
                        new 
StateObjectCollection<>(Collections.singletonList(thirdFailHandle)));
-               Iterator<StateObjectCollection<OperatorStateHandle>> iterator = 
sortedRestoreOptions.iterator();
 
                BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
-                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry, "test op state backend");
 
                try {
-                       restorerProcedure.createAndRestore(iterator);
+                       
restorerProcedure.createAndRestore(sortedRestoreOptions);
                        Assert.fail();
                } catch (Exception ignore) {
                }
@@ -165,7 +160,6 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                verify(firstFailHandle).openInputStream();
                verify(secondFailHandle).openInputStream();
                verify(thirdFailHandle).openInputStream();
-               Assert.assertFalse(iterator.hasNext());
        }
 
        /**
@@ -183,12 +177,12 @@ public class BackendRestorerProcedureTest extends 
TestLogger {
                        Collections.singletonList(new 
StateObjectCollection<>(Collections.singletonList(blockingRestoreHandle)));
 
                BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
-                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry, "test op state backend");
 
                AtomicReference<Exception> exceptionReference = new 
AtomicReference<>(null);
                Thread restoreThread = new Thread(() -> {
                        try {
-                               
restorerProcedure.createAndRestore(sortedRestoreOptions.iterator());
+                               
restorerProcedure.createAndRestore(sortedRestoreOptions);
                        } catch (Exception e) {
                                exceptionReference.set(e);
                        }

Reply via email to