Repository: flink Updated Branches: refs/heads/master 8d180d5fa -> 56c756040
[hotfix] Improved logging for task local recovery Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56c75604 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56c75604 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56c75604 Branch: refs/heads/master Commit: 56c756040fc4b3f224c4e7c12d208a3ccf5a7c5e Parents: 8d180d5 Author: Stefan Richter <[email protected]> Authored: Mon Feb 26 18:03:14 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Tue Feb 27 15:28:11 2018 +0100 ---------------------------------------------------------------------- .../PrioritizedOperatorSubtaskState.java | 31 +++++---- .../TaskExecutorLocalStateStoresManager.java | 68 ++++++++++++++------ .../runtime/state/TaskLocalStateStoreImpl.java | 63 ++++++++++++------ .../runtime/state/TaskStateManagerImpl.java | 27 +++++--- .../PrioritizedOperatorSubtaskStateTest.java | 4 +- .../runtime/state/TaskStateManagerImplTest.java | 4 +- .../api/operators/BackendRestorerProcedure.java | 61 ++++++++++++------ .../StreamTaskStateInitializerImpl.java | 20 ++++-- .../operators/BackendRestorerProcedureTest.java | 18 ++---- 9 files changed, 193 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java index f48d311..512f912 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java @@ -27,7 +27,6 @@ import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.function.BiFunction; @@ -79,39 +78,39 @@ public class PrioritizedOperatorSubtaskState { // ----------------------------------------------------------------------------------------------------------------- /** - * Returns an iterator over all alternative snapshots to restore the managed operator state, in the order in which - * we should attempt to restore. + * Returns an immutable list with all alternative snapshots to restore the managed operator state, in the order in + * which we should attempt to restore. */ @Nonnull - public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedManagedOperatorState() { - return prioritizedManagedOperatorState.iterator(); + public List<StateObjectCollection<OperatorStateHandle>> getPrioritizedManagedOperatorState() { + return prioritizedManagedOperatorState; } /** - * Returns an iterator over all alternative snapshots to restore the raw operator state, in the order in which we - * should attempt to restore. + * Returns an immutable list with all alternative snapshots to restore the raw operator state, in the order in + * which we should attempt to restore. */ @Nonnull - public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedRawOperatorState() { - return prioritizedRawOperatorState.iterator(); + public List<StateObjectCollection<OperatorStateHandle>> getPrioritizedRawOperatorState() { + return prioritizedRawOperatorState; } /** - * Returns an iterator over all alternative snapshots to restore the managed keyed state, in the order in which we - * should attempt to restore. + * Returns an immutable list with all alternative snapshots to restore the managed keyed state, in the order in + * which we should attempt to restore. */ @Nonnull - public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedManagedKeyedState() { - return prioritizedManagedKeyedState.iterator(); + public List<StateObjectCollection<KeyedStateHandle>> getPrioritizedManagedKeyedState() { + return prioritizedManagedKeyedState; } /** - * Returns an iterator over all alternative snapshots to restore the raw keyed state, in the order in which we + * Returns an immutable list with all alternative snapshots to restore the raw keyed state, in the order in which we * should attempt to restore. */ @Nonnull - public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedRawKeyedState() { - return prioritizedRawKeyedState.iterator(); + public List<StateObjectCollection<KeyedStateHandle>> getPrioritizedRawKeyedState() { + return prioritizedRawKeyedState; } // ----------------------------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index a940aef..e7a7d8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -110,38 +110,68 @@ public class TaskExecutorLocalStateStoresManager { "register a new TaskLocalStateStore."); } - final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers = - this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new HashMap<>()); + Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers = + this.taskStateStoresByAllocationID.get(allocationID); + + if (taskStateManagers == null) { + taskStateManagers = new HashMap<>(); + this.taskStateStoresByAllocationID.put(allocationID, taskStateManagers); + + if (LOG.isDebugEnabled()) { + LOG.debug("Registered new allocation id {} for local state stores for job {}.", + allocationID, jobId); + } + } final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex); - // create the allocation base dirs, one inside each root dir. - File[] allocationBaseDirectories = allocationBaseDirectories(allocationID); + TaskLocalStateStoreImpl taskLocalStateStore = taskStateManagers.get(taskKey); - LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl( - allocationBaseDirectories, - jobId, - jobVertexID, - subtaskIndex); + if (taskLocalStateStore == null) { + + // create the allocation base dirs, one inside each root dir. + File[] allocationBaseDirectories = allocationBaseDirectories(allocationID); + + LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl( + allocationBaseDirectories, + jobId, + jobVertexID, + subtaskIndex); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig( - localRecoveryMode, - directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + new LocalRecoveryConfig(localRecoveryMode, directoryProvider); - return taskStateManagers.computeIfAbsent( - taskKey, - k -> new TaskLocalStateStoreImpl( + taskLocalStateStore = new TaskLocalStateStoreImpl( jobId, allocationID, jobVertexID, subtaskIndex, localRecoveryConfig, - discardExecutor)); + discardExecutor); + + taskStateManagers.put(taskKey, taskLocalStateStore); + + if (LOG.isTraceEnabled()) { + LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.", + localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.", + jobId, jobVertexID, subtaskIndex, allocationID); + } + } + + return taskLocalStateStore; } } public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing local state under allocation id {}.", allocationID); + } + Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> cleanupLocalStores; synchronized (lock) { @@ -175,7 +205,7 @@ public class TaskExecutorLocalStateStoresManager { ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); - LOG.debug("Shutting down TaskExecutorLocalStateStoresManager."); + LOG.info("Shutting down TaskExecutorLocalStateStoresManager."); for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> entry : toRelease.entrySet()) { @@ -217,7 +247,7 @@ public class TaskExecutorLocalStateStoresManager { try { stateStore.dispose(); } catch (Exception disposeEx) { - LOG.warn("Exception while disposing local state store " + stateStore, disposeEx); + LOG.warn("Exception while disposing local state store {}.", stateStore, disposeEx); } } } @@ -233,7 +263,7 @@ public class TaskExecutorLocalStateStoresManager { try { FileUtils.deleteFileOrDirectory(directory); } catch (IOException e) { - LOG.warn("Exception while deleting local state directory for allocation " + allocationID, e); + LOG.warn("Exception while deleting local state directory for allocation id {}.", allocationID, e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index 191c109..bb4f011 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -103,15 +103,15 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor discardExecutor) { + this.lock = new Object(); + this.storedTaskStateByCheckpointID = new TreeMap<>(); this.jobID = jobID; this.allocationID = allocationID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; this.discardExecutor = discardExecutor; - this.lock = new Object(); - this.storedTaskStateByCheckpointID = new TreeMap<>(); - this.disposed = false; this.localRecoveryConfig = localRecoveryConfig; + this.disposed = false; } @Override @@ -123,8 +123,15 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { localState = NULL_DUMMY; } - LOG.info("Storing local state for checkpoint {}.", checkpointId); - LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); + if (LOG.isTraceEnabled()) { + LOG.debug( + "Stored local state for checkpoint {} in subtask ({} - {} - {}) : {}.", + checkpointId, jobID, jobVertexID, subtaskIndex, localState); + } else if (LOG.isDebugEnabled()) { + LOG.debug( + "Stored local state for checkpoint {} in subtask ({} - {} - {})", + checkpointId, jobID, jobVertexID, subtaskIndex); + } Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16); @@ -148,10 +155,21 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { + + TaskStateSnapshot snapshot; synchronized (lock) { - TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); - return snapshot != NULL_DUMMY ? snapshot : null; + snapshot = storedTaskStateByCheckpointID.get(checkpointID); } + + if (LOG.isTraceEnabled()) { + LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}", + checkpointID, jobID, jobVertexID, subtaskIndex, snapshot); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})", + checkpointID, jobID, jobVertexID, subtaskIndex); + } + + return snapshot != NULL_DUMMY ? snapshot : null; } @Override @@ -163,7 +181,8 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { @Override public void confirmCheckpoint(long confirmedCheckpointId) { - LOG.debug("Received confirmation for checkpoint {}. Starting to prune history.", confirmedCheckpointId); + LOG.debug("Received confirmation for checkpoint {} in subtask ({} - {} - {}). Starting to prune history.", + confirmedCheckpointId, jobID, jobVertexID, subtaskIndex); final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); @@ -216,7 +235,8 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { try { deleteDirectory(subtaskBaseDirectory); } catch (IOException e) { - LOG.warn("Exception when deleting local recovery subtask base dir: " + subtaskBaseDirectory, e); + LOG.warn("Exception when deleting local recovery subtask base directory {} in subtask ({} - {} - {})", + subtaskBaseDirectory, jobID, jobVertexID, subtaskIndex, e); } } }, @@ -240,27 +260,32 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { */ private void discardLocalStateForCheckpoint(long checkpointID, TaskStateSnapshot o) { + if (LOG.isTraceEnabled()) { + LOG.trace("Discarding local task state snapshot of checkpoint {} for subtask ({} - {} - {}).", + checkpointID, jobID, jobVertexID, subtaskIndex); + } else { + LOG.debug("Discarding local task state snapshot {} of checkpoint {} for subtask ({} - {} - {}).", + o, checkpointID, jobID, jobVertexID, subtaskIndex); + } + try { - if (LOG.isTraceEnabled()) { - LOG.trace("Discarding local task state snapshot of checkpoint {} for {}/{}/{}.", - checkpointID, jobID, jobVertexID, subtaskIndex); - } else { - LOG.debug("Discarding local task state snapshot {} of checkpoint {} for {}/{}/{}.", - o, checkpointID, jobID, jobVertexID, subtaskIndex); - } o.discardState(); } catch (Exception discardEx) { - LOG.warn("Exception while discarding local task state snapshot of checkpoint " + checkpointID + ".", discardEx); + LOG.warn("Exception while discarding local task state snapshot of checkpoint {} in subtask ({} - {} - {}).", + checkpointID, jobID, jobVertexID, subtaskIndex, discardEx); } LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File checkpointDir = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID); - LOG.debug("Deleting local state directory {} of checkpoint {} for {}/{}/{}/{}.", + + LOG.debug("Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).", checkpointDir, checkpointID, jobID, jobVertexID, subtaskIndex); + try { deleteDirectory(checkpointDir); } catch (IOException ex) { - LOG.warn("Exception while deleting local state directory of checkpoint " + checkpointID + ".", ex); + LOG.warn("Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).", + checkpointID, jobID, jobVertexID, subtaskIndex, ex); } } http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index 3acca7c..e057d19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -29,10 +29,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; /** * This class is the default implementation of {@link TaskStateManager} and collaborates with the job manager @@ -40,13 +44,14 @@ import java.util.Collections; * not have to deal with the differences between remote or local state on recovery because this class handles both * cases transparently. * - * Reported state is tagged by clients so that this class can properly forward to the right receiver for the + * <p>Reported state is tagged by clients so that this class can properly forward to the right receiver for the * checkpointed state. - * - * TODO: all interaction with local state store must still be implemented! It is currently just a placeholder. */ public class TaskStateManagerImpl implements TaskStateManager { + /** The logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(TaskStateManagerImpl.class); + /** The id of the job for which this manager was created, can report, and recover. */ private final JobID jobId; @@ -117,21 +122,27 @@ public class TaskStateManagerImpl implements TaskStateManager { TaskStateSnapshot localStateSnapshot = localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId()); + List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList(); + if (localStateSnapshot != null) { OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID); if (localSubtaskState != null) { - PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( - jobManagerSubtaskState, - Collections.singletonList(localSubtaskState)); - return builder.build(); + alternativesByPriority = Collections.singletonList(localSubtaskState); } } + if (LOG.isTraceEnabled()) { + LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " + + "state store {}.", + operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore); + } + PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( jobManagerSubtaskState, - Collections.emptyList(), + alternativesByPriority, true); + return builder.build(); } http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java index 09c9efb..82082e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java @@ -216,7 +216,7 @@ public class PrioritizedOperatorSubtaskStateTest extends TestLogger { private <T extends StateObject> boolean checkResultAsExpected( Function<OperatorSubtaskState, StateObjectCollection<T>> extractor, - Function<PrioritizedOperatorSubtaskState, Iterator<StateObjectCollection<T>>> extractor2, + Function<PrioritizedOperatorSubtaskState, List<StateObjectCollection<T>>> extractor2, PrioritizedOperatorSubtaskState prioritizedResult, OperatorSubtaskState... expectedOrdered) { @@ -226,7 +226,7 @@ public class PrioritizedOperatorSubtaskStateTest extends TestLogger { } return checkRepresentSameOrder( - extractor2.apply(prioritizedResult), + extractor2.apply(prioritizedResult).iterator(), collector.toArray(new StateObjectCollection[collector.size()])); } http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index 926c196..f58f3f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -142,7 +142,7 @@ public class TaskStateManagerImplTest extends TestLogger { // checks for operator 1. Iterator<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState_1 = - prioritized_1.getPrioritizedManagedKeyedState(); + prioritized_1.getPrioritizedManagedKeyedState().iterator(); Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext()); StateObjectCollection<KeyedStateHandle> current = prioritizedManagedKeyedState_1.next(); @@ -158,7 +158,7 @@ public class TaskStateManagerImplTest extends TestLogger { // checks for operator 2. Iterator<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState_2 = - prioritized_2.getPrioritizedRawKeyedState(); + prioritized_2.getPrioritizedRawKeyedState().iterator(); Assert.assertTrue(prioritizedRawKeyedState_2.hasNext()); current = prioritizedRawKeyedState_2.next(); http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java index ba27a0a..dd75fb2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java @@ -36,7 +36,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; +import java.util.List; /** * This class implements the logic that creates (and potentially restores) a state backend. The restore logic @@ -62,6 +62,9 @@ public class BackendRestorerProcedure< /** This registry is used so that recovery can participate in the task lifecycle, i.e. can be canceled. */ private final CloseableRegistry backendCloseableRegistry; + /** Description of this instance for logging. */ + private final String logDescription; + /** * Creates a new backend restorer using the given backend supplier and the closeable registry. * @@ -70,43 +73,63 @@ public class BackendRestorerProcedure< */ public BackendRestorerProcedure( @Nonnull SupplierWithException<T, Exception> instanceSupplier, - @Nonnull CloseableRegistry backendCloseableRegistry) { + @Nonnull CloseableRegistry backendCloseableRegistry, + @Nonnull String logDescription) { this.instanceSupplier = Preconditions.checkNotNull(instanceSupplier); this.backendCloseableRegistry = Preconditions.checkNotNull(backendCloseableRegistry); + this.logDescription = logDescription; } /** * Creates a new state backend and restores it from the provided set of state snapshot alternatives. * - * @param restoreOptions iterator over a prioritized set of state snapshot alternatives for recovery. + * @param restoreOptions list of prioritized state snapshot alternatives for recovery. * @return the created (and restored) state backend. * @throws Exception if the backend could not be created or restored. */ - public @Nonnull - T createAndRestore(@Nonnull Iterator<? extends Collection<S>> restoreOptions) throws Exception { + @Nonnull + public T createAndRestore(@Nonnull List<? extends Collection<S>> restoreOptions) throws Exception { + + if (restoreOptions.isEmpty()) { + restoreOptions = Collections.singletonList(Collections.emptyList()); + } + + int alternativeIdx = 0; + + Exception collectedException = null; + + while (alternativeIdx < restoreOptions.size()) { + + Collection<S> restoreState = restoreOptions.get(alternativeIdx); - // This ensures that we always call the restore method even if there is no previous state - // (required by some backends). - Collection<S> attemptState = restoreOptions.hasNext() ? - restoreOptions.next() : - Collections.emptyList(); + ++alternativeIdx; + + if (restoreState.isEmpty()) { + LOG.debug("Creating {} with empty state.", logDescription); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating {} and restoring with state {} from alternative ({}/{}).", + logDescription, restoreState, alternativeIdx, restoreOptions.size()); + } else { + LOG.debug("Creating {} and restoring with state from alternative ({}/{}).", + logDescription, alternativeIdx, restoreOptions.size()); + } + } - while (true) { try { - return attemptCreateAndRestore(attemptState); + return attemptCreateAndRestore(restoreState); } catch (Exception ex) { - // more attempts? - if (restoreOptions.hasNext()) { - attemptState = restoreOptions.next(); - LOG.warn("Exception while restoring backend, will retry with another snapshot replica.", ex); - } else { + collectedException = ExceptionUtils.firstOrSuppressed(ex, collectedException); - throw new FlinkException("Could not restore from any of the provided restore options.", ex); - } + LOG.warn("Exception while restoring {} from alternative ({}/{}), will retry while more " + + "alternatives are available.", logDescription, alternativeIdx, restoreOptions.size(), ex); } } + + throw new FlinkException("Could not restore " + logDescription + " from any of the " + restoreOptions.size() + + " provided restore options.", collectedException); } private T attemptCreateAndRestore(Collection<S> restoreState) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 11e2dda..acbc2f8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -143,11 +143,11 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize // -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( - prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState()); + prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( - prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState()); + prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- @@ -226,12 +226,16 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry) throws Exception { + String logDescription = "operator state backend for " + operatorIdentifierText; + BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer = new BackendRestorerProcedure<>( () -> stateBackend.createOperatorStateBackend(environment, operatorIdentifierText), - backendCloseableRegistry); + backendCloseableRegistry, + logDescription); - return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState()); + return backendRestorer.createAndRestore( + prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState()); } protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend( @@ -244,6 +248,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize return null; } + String logDescription = "keyed state backend for " + operatorIdentifierText; + TaskInfo taskInfo = environment.getTaskInfo(); final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( @@ -261,9 +267,11 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize taskInfo.getMaxNumberOfParallelSubtasks(), keyGroupRange, environment.getTaskKvStateRegistry()), - backendCloseableRegistry); + backendCloseableRegistry, + logDescription); - return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); + return backendRestorer.createAndRestore( + prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState()); } protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs( http://git-wip-us.apache.org/repos/asf/flink/blob/56c75604/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java index 2126f70..0f15d11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java @@ -104,21 +104,17 @@ public class BackendRestorerProcedureTest extends TestLogger { new StateObjectCollection<>(Collections.singletonList(firstFailHandle)), new StateObjectCollection<>(Collections.singletonList(secondSuccessHandle)), new StateObjectCollection<>(Collections.singletonList(thirdNotUsedHandle))); - Iterator<StateObjectCollection<OperatorStateHandle>> iterator = sortedRestoreOptions.iterator(); BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = - new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry, "test op state backend"); - OperatorStateBackend restoredBackend = restorerProcedure.createAndRestore(iterator); + OperatorStateBackend restoredBackend = restorerProcedure.createAndRestore(sortedRestoreOptions); Assert.assertNotNull(restoredBackend); try { - Assert.assertTrue(iterator.hasNext()); - Assert.assertTrue(thirdNotUsedHandle == iterator.next().iterator().next()); verify(firstFailHandle).openInputStream(); verify(secondSuccessHandle).openInputStream(); verifyZeroInteractions(thirdNotUsedHandle); - Assert.assertFalse(iterator.hasNext()); ListState<Integer> listState = restoredBackend.getListState(stateDescriptor); @@ -151,13 +147,12 @@ public class BackendRestorerProcedureTest extends TestLogger { new StateObjectCollection<>(Collections.singletonList(firstFailHandle)), new StateObjectCollection<>(Collections.singletonList(secondFailHandle)), new StateObjectCollection<>(Collections.singletonList(thirdFailHandle))); - Iterator<StateObjectCollection<OperatorStateHandle>> iterator = sortedRestoreOptions.iterator(); BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = - new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry, "test op state backend"); try { - restorerProcedure.createAndRestore(iterator); + restorerProcedure.createAndRestore(sortedRestoreOptions); Assert.fail(); } catch (Exception ignore) { } @@ -165,7 +160,6 @@ public class BackendRestorerProcedureTest extends TestLogger { verify(firstFailHandle).openInputStream(); verify(secondFailHandle).openInputStream(); verify(thirdFailHandle).openInputStream(); - Assert.assertFalse(iterator.hasNext()); } /** @@ -183,12 +177,12 @@ public class BackendRestorerProcedureTest extends TestLogger { Collections.singletonList(new StateObjectCollection<>(Collections.singletonList(blockingRestoreHandle))); BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> restorerProcedure = - new BackendRestorerProcedure<>(backendSupplier, closeableRegistry); + new BackendRestorerProcedure<>(backendSupplier, closeableRegistry, "test op state backend"); AtomicReference<Exception> exceptionReference = new AtomicReference<>(null); Thread restoreThread = new Thread(() -> { try { - restorerProcedure.createAndRestore(sortedRestoreOptions.iterator()); + restorerProcedure.createAndRestore(sortedRestoreOptions); } catch (Exception e) { exceptionReference.set(e); }
