http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index 4277348..44ca9d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -23,6 +23,8 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.io.Closeable; import java.io.IOException; import java.util.concurrent.RunnableFuture; @@ -31,8 +33,11 @@ import java.util.concurrent.RunnableFuture; * This class is a default implementation for StateSnapshotContext. */ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable { - + + /** Checkpoint id of the snapshot. */ private final long checkpointId; + + /** Checkpoint timestamp of the snapshot. */ private final long checkpointTimestamp; /** Factory for he checkpointing stream */ @@ -47,7 +52,10 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext */ private final CloseableRegistry closableRegistry; + /** Output stream for the raw keyed state. */ private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream; + + /** Output stream for the raw operator state. */ private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream; @VisibleForTesting @@ -109,14 +117,23 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext return operatorStateCheckpointOutputStream; } - public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture() throws IOException { - KeyGroupsStateHandle keyGroupsStateHandle = closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); - return new DoneFuture<KeyedStateHandle>(keyGroupsStateHandle); + @Nonnull + public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException { + KeyedStateHandle keyGroupsStateHandle = + closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); + return toDoneFutureOfSnapshotResult(keyGroupsStateHandle); } - public RunnableFuture<OperatorStateHandle> getOperatorStateStreamFuture() throws IOException { - OperatorStateHandle operatorStateHandle = closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); - return new DoneFuture<>(operatorStateHandle); + @Nonnull + public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException { + OperatorStateHandle operatorStateHandle = + closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); + return toDoneFutureOfSnapshotResult(operatorStateHandle); + } + + private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) { + SnapshotResult<T> snapshotResult = SnapshotResult.of(handle); + return DoneFuture.of(snapshotResult); } private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle( @@ -130,7 +147,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext } private <T extends StreamStateHandle> void closeAndUnregisterStream( - NonClosingCheckpointOutputStream<T> stream) throws IOException { + NonClosingCheckpointOutputStream<? extends T> stream) throws IOException { Preconditions.checkNotNull(stream); @@ -149,9 +166,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext try { closeAndUnregisterStream(keyedStateCheckpointOutputStream); } catch (IOException e) { - exception = ExceptionUtils.firstOrSuppressed( - new IOException("Could not close the raw keyed state checkpoint output stream.", e), - exception); + exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 326b95c..a940aef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -18,81 +18,241 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.ShutdownHookUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; /** - * This class holds the all {@link TaskLocalStateStore} objects for a task executor (manager). - * - * TODO: this still still work in progress and partially still acts as a placeholder. + * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task executor (manager). */ public class TaskExecutorLocalStateStoresManager { + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class); + /** * This map holds all local state stores for tasks running on the task manager / executor that own the instance of - * this. + * this. Maps from allocation id to all the subtask's local state stores. */ - private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> taskStateManagers; + @GuardedBy("lock") + private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID; + + /** The configured mode for local recovery on this task manager. */ + private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + + /** This is the root directory for all local state of this task manager / executor. */ + private final File[] localStateRootDirectories; + + /** Executor that runs the discarding of released state objects. */ + private final Executor discardExecutor; + + /** Guarding lock for taskStateStoresByAllocationID and closed-flag. */ + private final Object lock; + + private final Thread shutdownHook; + + @GuardedBy("lock") + private boolean closed; + + public TaskExecutorLocalStateStoresManager( + @Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, + @Nonnull File[] localStateRootDirectories, + @Nonnull Executor discardExecutor) throws IOException { + + this.taskStateStoresByAllocationID = new HashMap<>(); + this.localRecoveryMode = localRecoveryMode; + this.localStateRootDirectories = localStateRootDirectories; + this.discardExecutor = discardExecutor; + this.lock = new Object(); + this.closed = false; + + for (File localStateRecoveryRootDir : localStateRootDirectories) { + if (!localStateRecoveryRootDir.exists()) { + + if (!localStateRecoveryRootDir.mkdirs()) { + throw new IOException("Could not create root directory for local recovery: " + + localStateRecoveryRootDir); + } + } + } - public TaskExecutorLocalStateStoresManager() { - this.taskStateManagers = new HashMap<>(); + // register a shutdown hook + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); } - public TaskLocalStateStore localStateStoreForTask( - JobID jobId, - JobVertexID jobVertexID, - int subtaskIndex) { + @Nonnull + public TaskLocalStateStore localStateStoreForSubtask( + @Nonnull JobID jobId, + @Nonnull AllocationID allocationID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex) { + + synchronized (lock) { + + if (closed) { + throw new IllegalStateException("TaskExecutorLocalStateStoresManager is already closed and cannot " + + "register a new TaskLocalStateStore."); + } + + final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers = + this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new HashMap<>()); + + final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex); + + // create the allocation base dirs, one inside each root dir. + File[] allocationBaseDirectories = allocationBaseDirectories(allocationID); - Preconditions.checkNotNull(jobId); - final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex); + LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl( + allocationBaseDirectories, + jobId, + jobVertexID, + subtaskIndex); - final Map<JobVertexSubtaskKey, TaskLocalStateStore> taskStateManagers = - this.taskStateManagers.computeIfAbsent(jobId, k -> new HashMap<>()); + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig( + localRecoveryMode, + directoryProvider); - return taskStateManagers.computeIfAbsent( - taskKey, k -> new TaskLocalStateStore(jobId, jobVertexID, subtaskIndex)); + return taskStateManagers.computeIfAbsent( + taskKey, + k -> new TaskLocalStateStoreImpl( + jobId, + allocationID, + jobVertexID, + subtaskIndex, + localRecoveryConfig, + discardExecutor)); + } } - public void releaseJob(JobID jobID) { + public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) { + + Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> cleanupLocalStores; - Map<JobVertexSubtaskKey, TaskLocalStateStore> cleanupLocalStores = taskStateManagers.remove(jobID); + synchronized (lock) { + if (closed) { + return; + } + cleanupLocalStores = taskStateStoresByAllocationID.remove(allocationID); + } if (cleanupLocalStores != null) { doRelease(cleanupLocalStores.values()); } + + cleanupAllocationBaseDirs(allocationID); } - public void releaseAll() { + public void shutdown() { + + HashMap<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> toRelease; + + synchronized (lock) { - for (Map<JobVertexSubtaskKey, TaskLocalStateStore> stateStoreMap : taskStateManagers.values()) { - doRelease(stateStoreMap.values()); + if (closed) { + return; + } + + closed = true; + toRelease = new HashMap<>(taskStateStoresByAllocationID); + taskStateStoresByAllocationID.clear(); + } + + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); + + LOG.debug("Shutting down TaskExecutorLocalStateStoresManager."); + + for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> entry : + toRelease.entrySet()) { + + doRelease(entry.getValue().values()); + cleanupAllocationBaseDirs(entry.getKey()); } + } + + @VisibleForTesting + public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + @VisibleForTesting + File[] getLocalStateRootDirectories() { + return localStateRootDirectories; + } + + @VisibleForTesting + String allocationSubDirString(AllocationID allocationID) { + return "aid_" + allocationID; + } - taskStateManagers.clear(); + private File[] allocationBaseDirectories(AllocationID allocationID) { + final String allocationSubDirString = allocationSubDirString(allocationID); + final File[] allocationDirectories = new File[localStateRootDirectories.length]; + for (int i = 0; i < localStateRootDirectories.length; ++i) { + allocationDirectories[i] = new File(localStateRootDirectories[i], allocationSubDirString); + } + return allocationDirectories; } - private void doRelease(Iterable<TaskLocalStateStore> toRelease) { + private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) { + if (toRelease != null) { - for (TaskLocalStateStore stateStore : toRelease) { - stateStore.dispose(); + + for (TaskLocalStateStoreImpl stateStore : toRelease) { + try { + stateStore.dispose(); + } catch (Exception disposeEx) { + LOG.warn("Exception while disposing local state store " + stateStore, disposeEx); + } + } + } + } + + /** + * Deletes the base dirs for this allocation id (recursively). + */ + private void cleanupAllocationBaseDirs(AllocationID allocationID) { + // clear the base dirs for this allocation id. + File[] allocationDirectories = allocationBaseDirectories(allocationID); + for (File directory : allocationDirectories) { + try { + FileUtils.deleteFileOrDirectory(directory); + } catch (IOException e) { + LOG.warn("Exception while deleting local state directory for allocation " + allocationID, e); } } } + /** + * Composite key of {@link JobVertexID} and subtask index that describes the subtask of a job vertex. + */ private static final class JobVertexSubtaskKey { + /** The job vertex id. */ @Nonnull final JobVertexID jobVertexID; + + /** The subtask index. */ + @Nonnegative final int subtaskIndex; - public JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, int subtaskIndex) { - this.jobVertexID = Preconditions.checkNotNull(jobVertexID); + JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex) { + this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; } @@ -107,10 +267,7 @@ public class TaskExecutorLocalStateStoresManager { JobVertexSubtaskKey that = (JobVertexSubtaskKey) o; - if (subtaskIndex != that.subtaskIndex) { - return false; - } - return jobVertexID.equals(that.jobVertexID); + return subtaskIndex == that.subtaskIndex && jobVertexID.equals(that.jobVertexID); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java index f743630..7089894 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java @@ -18,44 +18,48 @@ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** - * This class will service as a task-manager-level local storage for local checkpointed state. The purpose is to provide - * access to a state that is stored locally for a faster recovery compared to the state that is stored remotely in a - * stable store DFS. For now, this storage is only complementary to the stable storage and local state is typically - * lost in case of machine failures. In such cases (and others), client code of this class must fall back to using the - * slower but highly available store. - * - * TODO this is currently a placeholder / mock that still must be implemented! + * Classes that implement this interface serve as a task-manager-level local storage for local checkpointed state. + * The purpose is to provide access to a state that is stored locally for a faster recovery compared to the state that + * is stored remotely in a stable store DFS. For now, this storage is only complementary to the stable storage and local + * state is typically lost in case of machine failures. In such cases (and others), client code of this class must fall + * back to using the slower but highly available store. */ -public class TaskLocalStateStore { - - /** */ - private final JobID jobID; - - /** */ - private final JobVertexID jobVertexID; - - /** */ - private final int subtaskIndex; - - public TaskLocalStateStore( - JobID jobID, - JobVertexID jobVertexID, - int subtaskIndex) { +public interface TaskLocalStateStore { + /** + * Stores the local state for the given checkpoint id. + * + * @param checkpointId id for the checkpoint that created the local state that will be stored. + * @param localState the local state to store. + */ + void storeLocalState( + @Nonnegative long checkpointId, + @Nullable TaskStateSnapshot localState); - this.jobID = jobID; - this.jobVertexID = jobVertexID; - this.subtaskIndex = subtaskIndex; - } + /** + * Returns the local state that is stored under the given checkpoint id or null if nothing was stored under the id. + * + * @param checkpointID the checkpoint id by which we search for local state. + * @return the local state found for the given checkpoint id. Can be null + */ + @Nullable + TaskStateSnapshot retrieveLocalState(long checkpointID); - public void storeSnapshot(/* TODO */) { - throw new UnsupportedOperationException("TODO!"); - } + /** + * Returns the {@link LocalRecoveryConfig} for this task local state store. + */ + @Nonnull + LocalRecoveryConfig getLocalRecoveryConfig(); - public void dispose() { - throw new UnsupportedOperationException("TODO!"); - } + /** + * Notifies that the checkpoint with the given id was confirmed as complete. This prunes the checkpoint history + * and removes all local states with a checkpoint id that is smaller than the newly confirmed checkpoint id. + */ + void confirmCheckpoint(long confirmedCheckpointId); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java new file mode 100644 index 0000000..191c109 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +/** + * Main implementation of a {@link TaskLocalStateStore}. + */ +public class TaskLocalStateStoreImpl implements TaskLocalStateStore { + + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class); + + /** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */ + private static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0); + + /** JobID from the owning subtask. */ + @Nonnull + private final JobID jobID; + + /** AllocationID of the owning slot. */ + @Nonnull + private final AllocationID allocationID; + + /** JobVertexID of the owning subtask. */ + @Nonnull + private final JobVertexID jobVertexID; + + /** Subtask index of the owning subtask. */ + @Nonnegative + private final int subtaskIndex; + + /** The configured mode for local recovery. */ + @Nonnull + private final LocalRecoveryConfig localRecoveryConfig; + + /** Executor that runs the discarding of released state objects. */ + @Nonnull + private final Executor discardExecutor; + + /** Lock for synchronisation on the storage map and the discarded status. */ + @Nonnull + private final Object lock; + + /** Status flag if this store was already discarded. */ + @GuardedBy("lock") + private boolean disposed; + + /** Maps checkpoint ids to local TaskStateSnapshots. */ + @Nonnull + @GuardedBy("lock") + private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID; + + public TaskLocalStateStoreImpl( + @Nonnull JobID jobID, + @Nonnull AllocationID allocationID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull Executor discardExecutor) { + + this.jobID = jobID; + this.allocationID = allocationID; + this.jobVertexID = jobVertexID; + this.subtaskIndex = subtaskIndex; + this.discardExecutor = discardExecutor; + this.lock = new Object(); + this.storedTaskStateByCheckpointID = new TreeMap<>(); + this.disposed = false; + this.localRecoveryConfig = localRecoveryConfig; + } + + @Override + public void storeLocalState( + @Nonnegative long checkpointId, + @Nullable TaskStateSnapshot localState) { + + if (localState == null) { + localState = NULL_DUMMY; + } + + LOG.info("Storing local state for checkpoint {}.", checkpointId); + LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); + + Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16); + + synchronized (lock) { + if (disposed) { + // we ignore late stores and simply discard the state. + toDiscard.put(checkpointId, localState); + } else { + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointId, localState); + + if (previous != null) { + toDiscard.put(checkpointId, previous); + } + } + } + + asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + } + + @Override + @Nullable + public TaskStateSnapshot retrieveLocalState(long checkpointID) { + synchronized (lock) { + TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + return snapshot != NULL_DUMMY ? snapshot : null; + } + } + + @Override + @Nonnull + public LocalRecoveryConfig getLocalRecoveryConfig() { + return localRecoveryConfig; + } + + @Override + public void confirmCheckpoint(long confirmedCheckpointId) { + + LOG.debug("Received confirmation for checkpoint {}. Starting to prune history.", confirmedCheckpointId); + + final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); + + synchronized (lock) { + + Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + // remove entries for outdated checkpoints and discard their state. + while (entryIterator.hasNext()) { + + Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId < confirmedCheckpointId) { + toRemove.add(snapshotEntry); + entryIterator.remove(); + } else { + // we can stop because the map is sorted. + break; + } + } + } + + asyncDiscardLocalStateForCollection(toRemove); + } + + /** + * Disposes the state of all local snapshots managed by this object. + */ + public CompletableFuture<Void> dispose() { + + Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy; + + synchronized (lock) { + disposed = true; + statesCopy = new ArrayList<>(storedTaskStateByCheckpointID.entrySet()); + storedTaskStateByCheckpointID.clear(); + } + + return CompletableFuture.runAsync( + () -> { + // discard all remaining state objects. + syncDiscardLocalStateForCollection(statesCopy); + + // delete the local state subdirectory that belong to this subtask. + LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); + for (int i = 0; i < directoryProvider.allocationBaseDirsCount(); ++i) { + File subtaskBaseDirectory = directoryProvider.selectSubtaskBaseDirectory(i); + try { + deleteDirectory(subtaskBaseDirectory); + } catch (IOException e) { + LOG.warn("Exception when deleting local recovery subtask base dir: " + subtaskBaseDirectory, e); + } + } + }, + discardExecutor); + } + + private void asyncDiscardLocalStateForCollection(Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) { + if (!toDiscard.isEmpty()) { + discardExecutor.execute(() -> syncDiscardLocalStateForCollection(toDiscard)); + } + } + + private void syncDiscardLocalStateForCollection(Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) { + for (Map.Entry<Long, TaskStateSnapshot> entry : toDiscard) { + discardLocalStateForCheckpoint(entry.getKey(), entry.getValue()); + } + } + + /** + * Helper method that discards state objects with an executor and reports exceptions to the log. + */ + private void discardLocalStateForCheckpoint(long checkpointID, TaskStateSnapshot o) { + + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Discarding local task state snapshot of checkpoint {} for {}/{}/{}.", + checkpointID, jobID, jobVertexID, subtaskIndex); + } else { + LOG.debug("Discarding local task state snapshot {} of checkpoint {} for {}/{}/{}.", + o, checkpointID, jobID, jobVertexID, subtaskIndex); + } + o.discardState(); + } catch (Exception discardEx) { + LOG.warn("Exception while discarding local task state snapshot of checkpoint " + checkpointID + ".", discardEx); + } + + LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); + File checkpointDir = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID); + LOG.debug("Deleting local state directory {} of checkpoint {} for {}/{}/{}/{}.", + checkpointDir, checkpointID, jobID, jobVertexID, subtaskIndex); + try { + deleteDirectory(checkpointDir); + } catch (IOException ex) { + LOG.warn("Exception while deleting local state directory of checkpoint " + checkpointID + ".", ex); + } + } + + /** + * Helper method to delete a directory. + */ + private void deleteDirectory(File directory) throws IOException { + Path path = new Path(directory.toURI()); + FileSystem fileSystem = path.getFileSystem(); + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + } + } + + @Override + public String toString() { + return "TaskLocalStateStore{" + + "jobID=" + jobID + + ", jobVertexID=" + jobVertexID + + ", allocationID=" + allocationID + + ", subtaskIndex=" + subtaskIndex + + ", localRecoveryConfig=" + localRecoveryConfig + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java index 8b41e9e..82591ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -44,19 +44,29 @@ public interface TaskStateManager extends CheckpointListener { * Report the state snapshots for the operator instances running in the owning task. * * @param checkpointMetaData meta data from the checkpoint request. - * @param checkpointMetrics task level metrics for the checkpoint. - * @param acknowledgedState the reported states from the owning task. + * @param checkpointMetrics task level metrics for the checkpoint. + * @param acknowledgedState the reported states to acknowledge to the job manager. + * @param localState the reported states for local recovery. */ - void reportStateHandles( + void reportTaskStateSnapshots( @Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, - @Nullable TaskStateSnapshot acknowledgedState); + @Nullable TaskStateSnapshot acknowledgedState, + @Nullable TaskStateSnapshot localState); /** * Returns means to restore previously reported state of an operator running in the owning task. * * @param operatorID the id of the operator for which we request state. - * @return previous state for the operator. Null if no previous state exists. + * @return Previous state for the operator. The previous state can be empty if the operator had no previous state. */ - OperatorSubtaskState operatorStates(OperatorID operatorID); + @Nonnull + PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID); + + /** + * Returns the configuration for local recovery, i.e. the base directories for all file-based local state of the + * owning subtask and the general mode for local recovery. + */ + @Nonnull + LocalRecoveryConfig createLocalRecoveryConfig(); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index 3cd66fb..3acca7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -31,6 +32,8 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; + /** * This class is the default implementation of {@link TaskStateManager} and collaborates with the job manager * through {@link CheckpointResponder}) as well as a task-manager-local state store. Like this, client code does @@ -75,35 +78,67 @@ public class TaskStateManagerImpl implements TaskStateManager { } @Override - public void reportStateHandles( + public void reportTaskStateSnapshots( @Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, - @Nullable TaskStateSnapshot acknowledgedState) { + @Nullable TaskStateSnapshot acknowledgedState, + @Nullable TaskStateSnapshot localState) { + + long checkpointId = checkpointMetaData.getCheckpointId(); + + localStateStore.storeLocalState(checkpointId, localState); checkpointResponder.acknowledgeCheckpoint( jobId, executionAttemptID, - checkpointMetaData.getCheckpointId(), + checkpointId, checkpointMetrics, acknowledgedState); } + @Nonnull @Override - public OperatorSubtaskState operatorStates(OperatorID operatorID) { + public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) { if (jobManagerTaskRestore == null) { - return null; + return PrioritizedOperatorSubtaskState.emptyNotRestored(); + } + + TaskStateSnapshot jobManagerStateSnapshot = + jobManagerTaskRestore.getTaskStateSnapshot(); + + OperatorSubtaskState jobManagerSubtaskState = + jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID); + + if (jobManagerSubtaskState == null) { + return PrioritizedOperatorSubtaskState.emptyNotRestored(); } - TaskStateSnapshot taskStateSnapshot = jobManagerTaskRestore.getTaskStateSnapshot(); - return taskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + TaskStateSnapshot localStateSnapshot = + localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId()); - /* - TODO!!!!!!! - 1) lookup local states for a matching operatorID / checkpointID. - 2) if nothing available: look into job manager provided state. - 3) massage it into a snapshots and return stuff. - */ + if (localStateSnapshot != null) { + OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID); + + if (localSubtaskState != null) { + PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( + jobManagerSubtaskState, + Collections.singletonList(localSubtaskState)); + return builder.build(); + } + } + + PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( + jobManagerSubtaskState, + Collections.emptyList(), + true); + return builder.build(); + } + + @Nonnull + @Override + public LocalRecoveryConfig createLocalRecoveryConfig() { + return localStateStore.getLocalRecoveryConfig(); } /** @@ -111,6 +146,6 @@ public class TaskStateManagerImpl implements TaskStateManager { */ @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - //TODO activate and prune local state later + localStateStore.confirmCheckpoint(checkpointId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java new file mode 100644 index 0000000..054a98c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link CheckpointStateOutputStream} that writes into a specified file and + * returns a {@link FileStateHandle} upon closing. + * + * <p>Unlike the {@link org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream}, + * this stream does not have a threshold below which it returns a memory byte stream handle, + * and does not create random files, but writes to a specified file. + */ +public final class FileBasedStateOutputStream extends CheckpointStateOutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(FileBasedStateOutputStream.class); + + // ------------------------------------------------------------------------ + + private final FSDataOutputStream out; + + private final Path path; + + private final FileSystem fileSystem; + + private volatile boolean closed; + + + public FileBasedStateOutputStream(FileSystem fileSystem, Path path) throws IOException { + this.fileSystem = checkNotNull(fileSystem); + this.path = checkNotNull(path); + + this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE); + } + + // ------------------------------------------------------------------------ + // I/O + // ------------------------------------------------------------------------ + + @Override + public final void write(int b) throws IOException { + out.write(b); + } + + @Override + public final void write(@Nonnull byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public long getPos() throws IOException { + return out.getPos(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void sync() throws IOException { + out.sync(); + } + + // ------------------------------------------------------------------------ + // Closing + // ------------------------------------------------------------------------ + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + if (!closed) { + closed = true; + + try { + out.close(); + fileSystem.delete(path, false); + } + catch (Throwable t) { + LOG.warn("Could not close the state stream for {}.", path, t); + } + } + } + + @Nullable + @Override + public FileStateHandle closeAndGetHandle() throws IOException { + synchronized (this) { + if (!closed) { + try { + // make a best effort attempt to figure out the size + long size = 0; + try { + size = out.getPos(); + } catch (Exception ignored) {} + + // close and return + out.close(); + + return new FileStateHandle(path, size); + } + catch (Exception e) { + try { + fileSystem.delete(path, false); + } + catch (Exception deleteException) { + LOG.warn("Could not delete the checkpoint stream file {}.", path, deleteException); + } + + throw new IOException("Could not flush and close the file system " + + "output stream to " + path + " in order to obtain the " + + "stream state handle", e); + } + finally { + closed = true; + } + } + else { + throw new IOException("Stream has already been closed and discarded."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index f993786..609ef69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -29,6 +29,8 @@ import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Arrays; import java.util.UUID; @@ -115,8 +117,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { // ------------------------------------------------------------------------ @Override - public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception { - Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory; + public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { + + + Path target = scope == CheckpointedStateScope.EXCLUSIVE ?checkpointDirectory: sharedStateDirectory; int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold); @@ -275,6 +279,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { } } + @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { // check if there was nothing ever written http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 58791e2..637effd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -32,7 +32,9 @@ import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.util.TernaryBoolean; @@ -102,7 +104,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur * A value of 'undefined' means not yet configured, in which case the default will be used. */ private final TernaryBoolean asynchronousSnapshots; - // ------------------------------------------------------------------------ + // ----------------------------------------------------------------------- /** * Creates a new state backend that stores its checkpoint data in the file system and location @@ -451,7 +453,10 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry) { + + TaskStateManager taskStateManager = env.getTaskStateManager(); + LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); return new HeapKeyedStateBackend<>( kvStateRegistry, @@ -460,13 +465,14 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur numberOfKeyGroups, keyGroupRange, isUsingAsynchronousSnapshots(), - env.getExecutionConfig()); + env.getExecutionConfig(), + localRecoveryConfig); } @Override public OperatorStateBackend createOperatorStateBackend( Environment env, - String operatorIdentifier) throws Exception { + String operatorIdentifier) { return new DefaultOperatorStateBackend( env.getUserClassLoader(), http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index aa7ea6e..5d5f716 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -49,8 +49,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; @@ -117,13 +119,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final boolean asynchronousSnapshots; public HeapKeyedStateBackend( - TaskKvStateRegistry kvStateRegistry, - TypeSerializer<K> keySerializer, - ClassLoader userCodeClassLoader, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - boolean asynchronousSnapshots, - ExecutionConfig executionConfig) { + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + boolean asynchronousSnapshots, + ExecutionConfig executionConfig, + LocalRecoveryConfig localRecoveryConfig) { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); this.asynchronousSnapshots = asynchronousSnapshots; @@ -286,14 +289,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override @SuppressWarnings("unchecked") - public RunnableFuture<KeyedStateHandle> snapshot( + public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { if (!hasRegisteredState()) { - return DoneFuture.nullValue(); + return DoneFuture.of(SnapshotResult.empty()); } long syncStartTime = System.currentTimeMillis(); @@ -326,8 +329,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { //--------------------------------------------------- this becomes the end of sync part // implementation of the async IO operation, based on FutureTask - final AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = - new AbstractAsyncCallableWithResources<KeyedStateHandle>() { + final AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = + new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { CheckpointStreamFactory.CheckpointStateOutputStream stream = null; @@ -359,7 +362,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public KeyGroupsStateHandle performOperation() throws Exception { + public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream; @@ -401,15 +404,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); - return keyGroupsStateHandle; + return SnapshotResult.of(keyGroupsStateHandle); } } - return null; + return SnapshotResult.empty(); } }; - AsyncStoppableTaskWithCallback<KeyedStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable); + AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable); if (!asynchronousSnapshots) { task.run(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 168e4ff..0801429 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,7 +49,8 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( - CheckpointedStateScope scope) throws Exception { + CheckpointedStateScope scope) throws IOException + { return new MemoryCheckpointOutputStream(maxStateSize); } @@ -114,6 +117,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { } } + @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { if (isEmpty) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 88d7b01..3da60e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.util.TernaryBoolean; @@ -299,13 +300,16 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf @Override public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, JobID jobID, + Environment env, + JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) { + TaskStateManager taskStateManager = env.getTaskStateManager(); + return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, @@ -313,7 +317,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf numberOfKeyGroups, keyGroupRange, isUsingAsynchronousSnapshots(), - env.getExecutionConfig()); + env.getExecutionConfig(), + taskStateManager.createLocalRecoveryConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f4c953d..927bd11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -205,7 +205,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.jobManagerTable = taskExecutorServices.getJobManagerTable(); this.jobLeaderService = taskExecutorServices.getJobLeaderService(); this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation(); - this.localStateStoresManager = taskExecutorServices.getTaskStateManager(); + this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore(); this.networkEnvironment = taskExecutorServices.getNetworkEnvironment(); this.jobManagerConnections = new HashMap<>(4); @@ -452,8 +452,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); - final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForTask( + final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( jobId, + tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); @@ -744,6 +745,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { onFatalError(slotNotFoundException); } + // release local state under the allocation id. + localStateStoresManager.releaseLocalStateForAllocationId(allocationId); + // sanity check if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { onFatalError(new Exception("Could not free slot " + slotId)); @@ -1271,6 +1275,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } catch (SlotNotFoundException e) { log.debug("Could not free slot for allocation id {}.", allocationId, e); } + + localStateStoresManager.releaseLocalStateForAllocationId(allocationId); } private void timeoutSlot(AllocationID allocationId, UUID ticket) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 2de1be8..08335b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -303,6 +303,7 @@ public class TaskManagerRunner implements FatalErrorHandler { TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, + rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory()); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index b710b6a..32c7ff7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -58,6 +60,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledThreadPoolExecutor; /** @@ -68,6 +71,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); + @VisibleForTesting + public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState"; + /** TaskManager services. */ private final TaskManagerLocation taskManagerLocation; private final MemoryManager memoryManager; @@ -78,7 +84,7 @@ public class TaskManagerServices { private final TaskSlotTable taskSlotTable; private final JobManagerTable jobManagerTable; private final JobLeaderService jobLeaderService; - private final TaskExecutorLocalStateStoresManager taskStateManager; + private final TaskExecutorLocalStateStoresManager taskManagerStateStore; TaskManagerServices( TaskManagerLocation taskManagerLocation, @@ -90,7 +96,7 @@ public class TaskManagerServices { TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, - TaskExecutorLocalStateStoresManager taskStateManager) { + TaskExecutorLocalStateStoresManager taskManagerStateStore) { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.memoryManager = Preconditions.checkNotNull(memoryManager); @@ -101,7 +107,7 @@ public class TaskManagerServices { this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable); this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService); - this.taskStateManager = Preconditions.checkNotNull(taskStateManager); + this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore); } // -------------------------------------------------------------------------------------------- @@ -144,8 +150,8 @@ public class TaskManagerServices { return jobLeaderService; } - public TaskExecutorLocalStateStoresManager getTaskStateManager() { - return taskStateManager; + public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() { + return taskManagerStateStore; } // -------------------------------------------------------------------------------------------- @@ -160,6 +166,12 @@ public class TaskManagerServices { Exception exception = null; try { + taskManagerStateStore.shutdown(); + } catch (Exception e) { + exception = e; + } + + try { memoryManager.shutdown(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); @@ -209,6 +221,7 @@ public class TaskManagerServices { * * @param resourceID resource ID of the task manager * @param taskManagerServicesConfiguration task manager configuration + * @param taskIOExecutor executor for async IO operations. * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory * @param maxJvmHeapMemory the maximum JVM heap size * @return task manager components @@ -217,6 +230,7 @@ public class TaskManagerServices { public static TaskManagerServices fromConfiguration( TaskManagerServicesConfiguration taskManagerServicesConfiguration, ResourceID resourceID, + Executor taskIOExecutor, long freeHeapMemoryWithDefrag, long maxJvmHeapMemory) throws Exception { @@ -256,7 +270,20 @@ public class TaskManagerServices { final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(); + + LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode(); + + final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); + + final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length]; + + for (int i = 0; i < stateRootDirectoryStrings.length; ++i) { + stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); + } + + final TaskExecutorLocalStateStoresManager taskStateManager = + new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor); + return new TaskManagerServices( taskManagerLocation, memoryManager, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 07cf660..d029bc5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; import org.apache.flink.util.NetUtils; @@ -54,6 +55,8 @@ public class TaskManagerServicesConfiguration { private final String[] tmpDirPaths; + private final String[] localRecoveryStateRootDirectories; + private final int numberOfSlots; private final NetworkEnvironmentConfiguration networkConfig; @@ -75,9 +78,13 @@ public class TaskManagerServicesConfiguration { private final long timerServiceShutdownTimeout; + private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + public TaskManagerServicesConfiguration( InetAddress taskManagerAddress, String[] tmpDirPaths, + String[] localRecoveryStateRootDirectories, + LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, NetworkEnvironmentConfiguration networkConfig, QueryableStateConfiguration queryableStateConfig, int numberOfSlots, @@ -89,6 +96,8 @@ public class TaskManagerServicesConfiguration { this.taskManagerAddress = checkNotNull(taskManagerAddress); this.tmpDirPaths = checkNotNull(tmpDirPaths); + this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories); + this.localRecoveryMode = checkNotNull(localRecoveryMode); this.networkConfig = checkNotNull(networkConfig); this.queryableStateConfig = checkNotNull(queryableStateConfig); this.numberOfSlots = checkNotNull(numberOfSlots); @@ -115,6 +124,14 @@ public class TaskManagerServicesConfiguration { return tmpDirPaths; } + public String[] getLocalRecoveryStateRootDirectories() { + return localRecoveryStateRootDirectories; + } + + public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } @@ -185,6 +202,15 @@ public class TaskManagerServicesConfiguration { } final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration); + String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration); + + if (localStateRootDir.length == 0) { + // default to temp dirs. + localStateRootDir = tmpDirs; + } + + LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = + LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration); final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( configuration, @@ -225,6 +251,8 @@ public class TaskManagerServicesConfiguration { return new TaskManagerServicesConfiguration( remoteAddress, tmpDirs, + localStateRootDir, + localRecoveryMode, networkConfig, queryableStateConfig, slots, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index c22413e..1ecb47a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -1202,25 +1202,24 @@ public class Task implements Runnable, TaskActions, CheckpointListener { if (executionState == ExecutionState.RUNNING && invokable != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - invokable.notifyCheckpointComplete(checkpointID); - taskStateManager.notifyCheckpointComplete(checkpointID);} - catch (Throwable t) { - if (getExecutionState() == ExecutionState.RUNNING) { - // fail task if checkpoint confirmation failed. - failExternally(new RuntimeException( - "Error while confirming checkpoint", - t)); - } + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + invokable.notifyCheckpointComplete(checkpointID); + taskStateManager.notifyCheckpointComplete(checkpointID); + } catch (Throwable t) { + if (getExecutionState() == ExecutionState.RUNNING) { + // fail task if checkpoint confirmation failed. + failExternally(new RuntimeException( + "Error while confirming checkpoint", + t)); } } - }; - executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + - taskNameWithSubtask); - + } + }; + executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + + taskNameWithSubtask); } else { LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 3173948..f62ef1b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup} import org.apache.flink.runtime.metrics.util.MetricUtils +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.util.EnvironmentInformation @@ -238,6 +239,7 @@ class LocalFlinkMiniCluster( val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, + ioExecutor, EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag, EnvironmentInformation.getMaxJvmHeapMemory) @@ -254,6 +256,7 @@ class LocalFlinkMiniCluster( taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment, + taskManagerServices.getTaskManagerStateStore, taskManagerMetricGroup) system.actorOf(props, taskManagerActorName) @@ -318,6 +321,7 @@ class LocalFlinkMiniCluster( memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, + taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, taskManagerMetricGroup: TaskManagerMetricGroup): Props = { TaskManager.getTaskManagerProps( @@ -328,6 +332,7 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, networkEnvironment, + taskManagerLocalStateStoresManager, highAvailabilityServices, taskManagerMetricGroup) } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 485add5..15581b2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,11 +35,11 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor} -import org.apache.flink.runtime.blob.{BlobCacheService, BlobClient, BlobService} +import org.apache.flink.runtime.blob.BlobCacheService import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.clusterframework.BootstrapTools import org.apache.flink.runtime.clusterframework.messages.StopCluster -import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.clusterframework.types.{AllocationID, ResourceID} import org.apache.flink.runtime.concurrent.{Executors, FutureUtils} import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor import org.apache.flink.runtime.execution.ExecutionState @@ -126,6 +126,7 @@ class TaskManager( protected val memoryManager: MemoryManager, protected val ioManager: IOManager, protected val network: NetworkEnvironment, + protected val taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, protected val numberOfSlots: Int, protected val highAvailabilityServices: HighAvailabilityServices, protected val taskManagerMetricGroup: TaskManagerMetricGroup) @@ -253,6 +254,12 @@ class TaskManager( } try { + taskManagerLocalStateStoresManager.shutdown() + } catch { + case t: Exception => log.error("Task state manager did not shutdown properly.", t) + } + + try { fileCache.shutdown() } catch { case t: Exception => log.error("FileCache did not shutdown properly.", t) @@ -474,7 +481,7 @@ class TaskManager( log.debug(s"Cannot find task to stop for execution ${executionID})") sender ! decorateMessage(Acknowledge.get()) } - + // cancels a task case CancelTask(executionID) => val task = runningTasks.get(executionID) @@ -984,7 +991,7 @@ class TaskManager( log.error(message, e) throw new RuntimeException(message, e) } - + // watch job manager to detect when it dies context.watch(jobManager) @@ -1070,7 +1077,7 @@ class TaskManager( // clear the key-value location oracle proxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, null) } - + // failsafe shutdown of the metrics registry try { taskManagerMetricGroup.close() @@ -1195,18 +1202,21 @@ class TaskManager( config.getTimeout().getSize(), config.getTimeout().getUnit())) - // TODO: wire this so that the manager survives the end of the task - val taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager + val jobID = jobInformation.getJobId - val localStateStore = taskExecutorLocalStateStoresManager.localStateStoreForTask( - jobInformation.getJobId, + // Allocation ids do not work properly without flip-6, so we just fake one, based on the jid. + val fakeAllocationID = new AllocationID(jobID.getLowerPart, jobID.getUpperPart) + + val taskLocalStateStore = taskManagerLocalStateStoresManager.localStateStoreForSubtask( + jobID, + fakeAllocationID, taskInformation.getJobVertexId, tdd.getSubtaskIndex) - val slotStateManager = new TaskStateManagerImpl( - jobInformation.getJobId, + val taskStateManager = new TaskStateManagerImpl( + jobID, tdd.getExecutionAttemptId, - localStateStore, + taskLocalStateStore, tdd.getTaskRestore, checkpointResponder) @@ -1224,7 +1234,7 @@ class TaskManager( ioManager, network, bcVarManager, - slotStateManager, + taskStateManager, taskManagerConnection, inputSplitProvider, checkpointResponder, @@ -2013,6 +2023,7 @@ object TaskManager { val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, + actorSystem.dispatcher, EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag, EnvironmentInformation.getMaxJvmHeapMemory) @@ -2030,6 +2041,7 @@ object TaskManager { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), + taskManagerServices.getTaskManagerStateStore(), highAvailabilityServices, taskManagerMetricGroup) @@ -2047,6 +2059,7 @@ object TaskManager { memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, + taskStateManager: TaskExecutorLocalStateStoresManager, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup: TaskManagerMetricGroup ): Props = { @@ -2058,6 +2071,7 @@ object TaskManager { memoryManager, ioManager, networkEnvironment, + taskStateManager, taskManagerConfig.getNumberSlots(), highAvailabilityServices, taskManagerMetricGroup) http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index cbfe0ed..32b32cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -25,8 +25,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.TestLogger; @@ -93,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { KeyedStateHandle managedKeyedHandle = mock(KeyedStateHandle.class); KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class); - OperatorStateHandle managedOpHandle = mock(OperatorStateHandle.class); - OperatorStateHandle rawOpHandle = mock(OperatorStateHandle.class); + OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class); + OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class); final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState( managedOpHandle, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index c791fd8..1b2062a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateHandleID; @@ -52,7 +53,6 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; -import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -62,7 +62,6 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; @@ -2739,7 +2738,7 @@ public class CheckpointCoordinatorTest extends TestLogger { metaInfoMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{121, 143, 147}, OperatorStateHandle.Mode.BROADCAST)); // this is what a single task will return - OperatorStateHandle osh = new OperatorStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new byte[150])); + OperatorStateHandle osh = new OperatorStreamStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new byte[150])); OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; List<Collection<OperatorStateHandle>> repartitionedStates = @@ -2817,7 +2816,7 @@ public class CheckpointCoordinatorTest extends TestLogger { KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0)); - ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare( + ByteStreamStateHandle allSerializedStatesHandle = new ByteStreamStateHandle( String.valueOf(UUID.randomUUID()), serializedDataWithOffsets.f0); @@ -2936,11 +2935,11 @@ public class CheckpointCoordinatorTest extends TestLogger { ++idx; } - ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare( + ByteStreamStateHandle streamStateHandle = new ByteStreamStateHandle( String.valueOf(UUID.randomUUID()), serializationWithOffsets.f0); - return new OperatorStateHandle(offsetsMap, streamStateHandle); + return new OperatorStreamStateHandle(offsetsMap, streamStateHandle); } static ExecutionJobVertex mockExecutionJobVertex( @@ -3265,7 +3264,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? - OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE; + OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE; namedStatesToOffsets.put( "State-" + s, new OperatorStateHandle.StateMetaInfo(offs, mode)); @@ -3282,7 +3281,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } previousParallelOpInstanceStates.add( - new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1))); + new OperatorStreamStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1))); } Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>(); @@ -3769,10 +3768,10 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState( - Collections.<OperatorStateHandle>emptyList(), - Collections.<OperatorStateHandle>emptyList(), - Collections.<KeyedStateHandle>singletonList(managedState), - Collections.<KeyedStateHandle>emptyList())); + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.singleton(managedState), + StateObjectCollection.empty())); Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java index 70794c6..ff787ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; -import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; @@ -69,7 +69,7 @@ public class CheckpointMetadataLoadingTest { OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); OperatorSubtaskState subtaskState = new OperatorSubtaskState( - new OperatorStateHandle( + new OperatorStreamStateHandle( Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])), null, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index ab353a9..af6ec71 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; @@ -123,10 +122,10 @@ public class CheckpointStateRestoreTest { subtaskStates.putSubtaskStateByOperatorID( OperatorID.fromJobVertexID(statefulId), new OperatorSubtaskState( - Collections.<OperatorStateHandle>emptyList(), - Collections.<OperatorStateHandle>emptyList(), - Collections.singletonList(serializedKeyGroupStates), - Collections.<KeyedStateHandle>emptyList())); + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.singleton(serializedKeyGroupStates), + StateObjectCollection.empty())); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates));
