http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 4898292..123cec2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -51,6 +51,7 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.stream.Stream; @@ -62,8 +63,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param <K> Type of the key by which state is keyed. */ -public abstract class AbstractKeyedStateBackend<K> - implements KeyedStateBackend<K>, Snapshotable<KeyedStateHandle>, Closeable, CheckpointListener { +public abstract class AbstractKeyedStateBackend<K> implements + KeyedStateBackend<K>, + Snapshotable<SnapshotResult<KeyedStateHandle>, Collection<KeyedStateHandle>>, + Closeable, + CheckpointListener { /** {@link TypeSerializer} for our key. */ protected final TypeSerializer<K> keySerializer; @@ -110,7 +114,7 @@ public abstract class AbstractKeyedStateBackend<K> KeyGroupRange keyGroupRange, ExecutionConfig executionConfig) { - this.kvStateRegistry = kvStateRegistry; //Preconditions.checkNotNull(kvStateRegistry); + this.kvStateRegistry = kvStateRegistry; this.keySerializer = Preconditions.checkNotNull(keySerializer); this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups); this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index e8997cd..3c6794e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.core.fs.FSDataOutputStream; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.OutputStream; @@ -38,9 +40,9 @@ public interface CheckpointStreamFactory { * @param scope The state's scope, whether it is exclusive or shared. * @return An output stream that writes state for the given checkpoint. * - * @throws Exception Exceptions may occur while creating the stream and should be forwarded. + * @throws IOException Exceptions may occur while creating the stream and should be forwarded. */ - CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception; + CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; /** * A dedicated output stream that produces a {@link StreamStateHandle} when closed. @@ -69,6 +71,7 @@ public interface CheckpointStreamFactory { * @return A state handle that can create an input stream producing the data written to this stream. * @throws IOException Thrown, if the stream cannot be closed. */ + @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; /** http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java new file mode 100644 index 0000000..41c35d4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.filesystem.FileBasedStateOutputStream; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * Interface that provides access to a CheckpointStateOutputStream and a method to provide the {@link SnapshotResult}. + * This abstracts from different ways that a result is obtained from checkpoint output streams. + */ +public interface CheckpointStreamWithResultProvider extends Closeable { + + Logger LOG = LoggerFactory.getLogger(CheckpointStreamWithResultProvider.class); + + /** + * Closes the stream ans returns a snapshot result with the stream handle(s). + */ + @Nonnull + SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException; + + /** + * Returns the encapsulated output stream. + */ + @Nonnull + CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream(); + + @Override + default void close() throws IOException { + getCheckpointOutputStream().close(); + } + + /** + * Implementation of {@link CheckpointStreamWithResultProvider} that only creates the + * primary/remote/jm-owned state. + */ + class PrimaryStreamOnly implements CheckpointStreamWithResultProvider { + + @Nonnull + private final CheckpointStreamFactory.CheckpointStateOutputStream outputStream; + + public PrimaryStreamOnly(@Nonnull CheckpointStreamFactory.CheckpointStateOutputStream outputStream) { + this.outputStream = outputStream; + } + + @Nonnull + @Override + public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException { + return SnapshotResult.of(outputStream.closeAndGetHandle()); + } + + @Nonnull + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream() { + return outputStream; + } + } + + /** + * Implementation of {@link CheckpointStreamWithResultProvider} that creates both, the + * primary/remote/jm-owned state and the secondary/local/tm-owned state. + */ + class PrimaryAndSecondaryStream implements CheckpointStreamWithResultProvider { + + private static final Logger LOG = LoggerFactory.getLogger(PrimaryAndSecondaryStream.class); + + @Nonnull + private final DuplicatingCheckpointOutputStream outputStream; + + public PrimaryAndSecondaryStream( + @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream primaryOut, + CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut) throws IOException { + this(new DuplicatingCheckpointOutputStream(primaryOut, secondaryOut)); + } + + PrimaryAndSecondaryStream(@Nonnull DuplicatingCheckpointOutputStream outputStream) { + this.outputStream = outputStream; + } + + @Nonnull + @Override + public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException { + + final StreamStateHandle primaryStreamStateHandle; + + try { + primaryStreamStateHandle = outputStream.closeAndGetPrimaryHandle(); + } catch (IOException primaryEx) { + try { + outputStream.close(); + } catch (IOException closeEx) { + primaryEx = ExceptionUtils.firstOrSuppressed(closeEx, primaryEx); + } + throw primaryEx; + } + + StreamStateHandle secondaryStreamStateHandle = null; + + try { + secondaryStreamStateHandle = outputStream.closeAndGetSecondaryHandle(); + } catch (IOException secondaryEx) { + LOG.warn("Exception from secondary/local checkpoint stream.", secondaryEx); + } + + if (primaryStreamStateHandle != null) { + if (secondaryStreamStateHandle != null) { + return SnapshotResult.withLocalState(primaryStreamStateHandle, secondaryStreamStateHandle); + } else { + return SnapshotResult.of(primaryStreamStateHandle); + } + } else { + return SnapshotResult.empty(); + } + } + + @Nonnull + @Override + public DuplicatingCheckpointOutputStream getCheckpointOutputStream() { + return outputStream; + } + } + + @Nonnull + static CheckpointStreamWithResultProvider createSimpleStream( + @Nonnull CheckpointedStateScope checkpointedStateScope, + @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException { + + CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = + primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); + + return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut); + } + + @Nonnull + static CheckpointStreamWithResultProvider createDuplicatingStream( + @Nonnegative long checkpointId, + @Nonnull CheckpointedStateScope checkpointedStateScope, + @Nonnull CheckpointStreamFactory primaryStreamFactory, + @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException { + + CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = + primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); + + try { + File outFile = new File( + secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId), + String.valueOf(UUID.randomUUID())); + Path outPath = new Path(outFile.toURI()); + + CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut = + new FileBasedStateOutputStream(outPath.getFileSystem(), outPath); + + return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut); + } catch (IOException secondaryEx) { + LOG.warn("Exception when opening secondary/local checkpoint output stream. " + + "Continue only with the primary stream.", secondaryEx); + } + + return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut); + } + + + /** + * Helper method that takes a {@link SnapshotResult<StreamStateHandle>} and a {@link KeyGroupRangeOffsets} and + * creates a {@link SnapshotResult<KeyGroupsStateHandle>} by combining the key groups offsets with all the + * present stream state handles. + */ + @Nonnull + static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult( + @Nonnull SnapshotResult<StreamStateHandle> snapshotResult, + @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) { + + StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot(); + + if (jobManagerOwnedSnapshot != null) { + + KeyedStateHandle jmKeyedState = new KeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot); + StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot(); + + if (taskLocalSnapshot != null) { + + KeyedStateHandle localKeyedState = new KeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot); + return SnapshotResult.withLocalState(jmKeyedState, localKeyedState); + } else { + + return SnapshotResult.of(jmKeyedState); + } + } else { + + return SnapshotResult.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 266483f..01a397a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -20,10 +20,10 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -45,6 +45,8 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -133,7 +135,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { public DefaultOperatorStateBackend( ClassLoader userClassLoader, ExecutionConfig executionConfig, - boolean asynchronousSnapshots) throws IOException { + boolean asynchronousSnapshots) { this.closeStreamOnCancelRegistry = new CloseableRegistry(); this.userClassloader = Preconditions.checkNotNull(userClassLoader); @@ -299,7 +301,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { // ------------------------------------------------------------------------------------------- @Override - public RunnableFuture<OperatorStateHandle> snapshot( + public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -308,7 +310,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { final long syncStartTime = System.currentTimeMillis(); if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) { - return DoneFuture.nullValue(); + return DoneFuture.of(SnapshotResult.empty()); } final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = @@ -346,8 +348,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // implementation of the async IO operation, based on FutureTask - final AbstractAsyncCallableWithResources<OperatorStateHandle> ioCallable = - new AbstractAsyncCallableWithResources<OperatorStateHandle>() { + final AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>> ioCallable = + new AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>>() { CheckpointStreamFactory.CheckpointStateOutputStream out = null; @@ -357,12 +359,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } @Override - protected void releaseResources() throws Exception { + protected void releaseResources() { closeOutStream(); } @Override - protected void stopOperation() throws Exception { + protected void stopOperation() { closeOutStream(); } @@ -377,8 +379,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } } + @Nonnull @Override - public OperatorStateHandle performOperation() throws Exception { + public SnapshotResult<OperatorStateHandle> performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out; @@ -444,7 +447,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { StreamStateHandle stateHandle = out.closeAndGetHandle(); if (stateHandle != null) { - retValue = new OperatorStateHandle(writtenStatesMetaData, stateHandle); + retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle); } } @@ -453,11 +456,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); } - return retValue; + return SnapshotResult.of(retValue); } }; - AsyncStoppableTaskWithCallback<OperatorStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable); + AsyncStoppableTaskWithCallback<SnapshotResult<OperatorStateHandle>> task = + AsyncStoppableTaskWithCallback.from(ioCallable); if (!asynchronousSnapshots) { task.run(); @@ -469,10 +473,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { return task; } - @Override public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception { - if (null == restoreSnapshots) { + if (null == restoreSnapshots || restoreSnapshots.isEmpty()) { return; } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java new file mode 100644 index 0000000..fc74638 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +/** + * This class is a keyed state handle based on a directory. It combines a {@link DirectoryStateHandle} and a + * {@link KeyGroupRange}. + */ +public class DirectoryKeyedStateHandle implements KeyedStateHandle { + + private static final long serialVersionUID = 1L; + + /** The directory state handle. */ + @Nonnull + private final DirectoryStateHandle directoryStateHandle; + + /** The key-group range. */ + @Nonnull + private final KeyGroupRange keyGroupRange; + + public DirectoryKeyedStateHandle( + @Nonnull DirectoryStateHandle directoryStateHandle, + @Nonnull KeyGroupRange keyGroupRange) { + + this.directoryStateHandle = directoryStateHandle; + this.keyGroupRange = keyGroupRange; + } + + @Nonnull + public DirectoryStateHandle getDirectoryStateHandle() { + return directoryStateHandle; + } + + @Nonnull + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public void discardState() throws Exception { + directoryStateHandle.discardState(); + } + + @Override + public long getStateSize() { + return directoryStateHandle.getStateSize(); + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange otherKeyGroupRange) { + return this.keyGroupRange.getIntersection(otherKeyGroupRange).getNumberOfKeyGroups() > 0 ? this : null; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + // Nothing to do, this is for local use only. + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DirectoryKeyedStateHandle that = (DirectoryKeyedStateHandle) o; + + if (!getDirectoryStateHandle().equals(that.getDirectoryStateHandle())) { + return false; + } + return getKeyGroupRange().equals(that.getKeyGroupRange()); + } + + @Override + public int hashCode() { + int result = getDirectoryStateHandle().hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + return result; + } + + @Override + public String toString() { + return "DirectoryKeyedStateHandle{" + + "directoryStateHandle=" + directoryStateHandle + + ", keyGroupRange=" + keyGroupRange + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java new file mode 100644 index 0000000..2d08777 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** + * This state handle represents a directory. This class is, for example, used to represent the directory of RocksDB's + * native checkpoint directories for local recovery. + */ +public class DirectoryStateHandle implements StateObject { + + /** Serial version. */ + private static final long serialVersionUID = 1L; + + /** The path that describes the directory. */ + @Nonnull + private final Path directory; + + public DirectoryStateHandle(@Nonnull Path directory) { + this.directory = directory; + } + + @Override + public void discardState() throws IOException { + FileSystem fileSystem = directory.getFileSystem(); + fileSystem.delete(directory, true); + } + + @Override + public long getStateSize() { + // For now, we will not report any size, but in the future this could (if needed) return the total dir size. + return 0L; // unknown + } + + @Nonnull + public Path getDirectory() { + return directory; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DirectoryStateHandle that = (DirectoryStateHandle) o; + + return directory.equals(that.directory); + } + + @Override + public int hashCode() { + return directory.hashCode(); + } + + @Override + public String toString() { + return "DirectoryStateHandle{" + + "directory=" + directory + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java index d2d808d..556212f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java @@ -17,11 +17,10 @@ */ package org.apache.flink.runtime.state; -import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * A {@link Future} that is always done and will just yield the object that was given at creation @@ -31,11 +30,10 @@ import java.util.concurrent.TimeoutException; */ public class DoneFuture<T> implements RunnableFuture<T> { - private static final DoneFuture<?> NULL_FUTURE = new DoneFuture<Object>(null); - + @Nullable private final T payload; - public DoneFuture(T payload) { + protected DoneFuture(@Nullable T payload) { this.payload = payload; } @@ -55,14 +53,14 @@ public class DoneFuture<T> implements RunnableFuture<T> { } @Override - public T get() throws InterruptedException, ExecutionException { + public T get() { return payload; } @Override public T get( long timeout, - TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + TimeUnit unit) { return get(); } @@ -71,8 +69,8 @@ public class DoneFuture<T> implements RunnableFuture<T> { } - @SuppressWarnings("unchecked") - public static <T> DoneFuture<T> nullValue() { - return (DoneFuture<T>) NULL_FUTURE; + + public static <T> DoneFuture<T> of(@Nullable T result) { + return new DoneFuture<>(result); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java new file mode 100644 index 0000000..259900c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * A CheckpointStateOutputStream that wraps a primary and a secondary CheckpointStateOutputStream and duplicates + * all writes into both streams. This stream applies buffering to reduce the amount of dual-method calling. Furthermore, + * exceptions that happen in interactions with the secondary stream are not exposed, until the user calls + * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions from interactions with the primary stream + * are immediately returned to the user. This class is used to write state for local recovery as a local (secondary) + * copy of the (primary) state snapshot that is written to a (slower but highly-available) remote filesystem. + */ +public class DuplicatingCheckpointOutputStream extends CheckpointStreamFactory.CheckpointStateOutputStream { + + /** Default buffer size of 8KB. */ + private static final int DEFAULT_BUFFER_SIZER = 8 * 1024; + + /** Write buffer. */ + private final byte[] buffer; + + /** Position in the write buffer. */ + private int bufferIdx; + + /** + * Primary stream for writing the checkpoint data. Failures from this stream are forwarded. + */ + private final CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream; + + /** + * Primary stream for writing the checkpoint data. Failures from this stream are not forwarded until + * {@link #closeAndGetSecondaryHandle()}. + */ + private final CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream; + + /** + * Stores a potential exception that occurred while interacting with {@link #secondaryOutputStream} + */ + private Exception secondaryStreamException; + + public DuplicatingCheckpointOutputStream( + CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream, + CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream) throws IOException { + this(primaryOutputStream, secondaryOutputStream, DEFAULT_BUFFER_SIZER); + } + + public DuplicatingCheckpointOutputStream( + CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream, + CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream, + int bufferSize) throws IOException { + + this.primaryOutputStream = Preconditions.checkNotNull(primaryOutputStream); + this.secondaryOutputStream = Preconditions.checkNotNull(secondaryOutputStream); + + this.buffer = new byte[bufferSize]; + this.bufferIdx = 0; + + this.secondaryStreamException = null; + + checkForAlignedStreamPositions(); + } + + @Override + public void write(int b) throws IOException { + + if (buffer.length <= bufferIdx) { + flushInternalBuffer(); + } + + buffer[bufferIdx] = (byte) b; + ++bufferIdx; + } + + @Override + public void write(byte[] b) throws IOException { + + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + + if (buffer.length <= len) { + + flushInternalBuffer(); + writeThroughInternal(b, off, len); + } else { + + if (buffer.length < len + bufferIdx) { + flushInternalBuffer(); + } + + System.arraycopy(b, off, buffer, bufferIdx, len); + bufferIdx += len; + } + } + + @Override + public long getPos() throws IOException { + final long referencePos = primaryOutputStream.getPos(); + return referencePos + bufferIdx; + } + + @Override + public void flush() throws IOException { + + flushInternalBuffer(); + primaryOutputStream.flush(); + + if (secondaryStreamException == null) { + try { + secondaryOutputStream.flush(); + } catch (Exception flushEx) { + handleSecondaryStreamOnException(flushEx); + } + } + } + + @Override + public void sync() throws IOException { + + flushInternalBuffer(); + primaryOutputStream.sync(); + + if (secondaryStreamException == null) { + try { + secondaryOutputStream.sync(); + } catch (Exception syncEx) { + handleSecondaryStreamOnException(syncEx); + } + } + } + + @Override + public void close() throws IOException { + + Exception exCollector = null; + + try { + flushInternalBuffer(); + } catch (Exception flushEx) { + exCollector = flushEx; + } + + try { + primaryOutputStream.close(); + } catch (Exception closeEx) { + exCollector = ExceptionUtils.firstOrSuppressed(closeEx, exCollector); + } + + if (secondaryStreamException == null) { + try { + secondaryOutputStream.close(); + } catch (Exception closeEx) { + handleSecondaryStreamOnException(closeEx); + } + } + + if (exCollector != null) { + throw new IOException("Exception while closing duplicating stream.", exCollector); + } + } + + private void checkForAlignedStreamPositions() throws IOException { + + if (secondaryStreamException != null) { + return; + } + + final long primaryPos = primaryOutputStream.getPos(); + + try { + final long secondaryPos = secondaryOutputStream.getPos(); + + if (primaryPos != secondaryPos) { + handleSecondaryStreamOnException( + new IOException("Stream positions are out of sync between primary stream and secondary stream. " + + "Reported positions are " + primaryPos + " (primary) and " + secondaryPos + " (secondary).")); + } + } catch (Exception posEx) { + handleSecondaryStreamOnException(posEx); + } + } + + private void flushInternalBuffer() throws IOException { + + if (bufferIdx > 0) { + writeThroughInternal(buffer, 0, bufferIdx); + bufferIdx = 0; + } + } + + private void writeThroughInternal(byte[] b, int off, int len) throws IOException { + + primaryOutputStream.write(b, off, len); + + if (secondaryStreamException == null) { + try { + secondaryOutputStream.write(b, off, len); + } catch (Exception writeEx) { + handleSecondaryStreamOnException(writeEx); + } + } + } + + private void handleSecondaryStreamOnException(Exception ex) { + + Preconditions.checkState(secondaryStreamException == null, + "Secondary stream already failed from previous exception!"); + + try { + secondaryOutputStream.close(); + } catch (Exception closeEx) { + ex = ExceptionUtils.firstOrSuppressed(closeEx, ex); + } + + secondaryStreamException = Preconditions.checkNotNull(ex); + } + + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + return closeAndGetPrimaryHandle(); + } + + /** + * Returns the state handle from the {@link #primaryOutputStream}. + */ + public StreamStateHandle closeAndGetPrimaryHandle() throws IOException { + flushInternalBuffer(); + return primaryOutputStream.closeAndGetHandle(); + } + + /** + * Returns the state handle from the {@link #secondaryOutputStream}. Also reports suppressed exceptions from earlier + * interactions with that stream. + */ + public StreamStateHandle closeAndGetSecondaryHandle() throws IOException { + if (secondaryStreamException == null) { + flushInternalBuffer(); + return secondaryOutputStream.closeAndGetHandle(); + } else { + throw new IOException("Secondary stream previously failed exceptionally", secondaryStreamException); + } + } + + public Exception getSecondaryStreamException() { + return secondaryStreamException; + } + + @VisibleForTesting + CheckpointStreamFactory.CheckpointStateOutputStream getPrimaryOutputStream() { + return primaryOutputStream; + } + + @VisibleForTesting + CheckpointStreamFactory.CheckpointStateOutputStream getSecondaryOutputStream() { + return secondaryOutputStream; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java new file mode 100644 index 0000000..f80a8ce --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.Set; +import java.util.UUID; + +/** + * State handle for local copies of {@link IncrementalKeyedStateHandle}. Consists of a {@link DirectoryStateHandle} that + * represents the directory of the native RocksDB snapshot, the key groups, and a stream state handle for Flink's state + * meta data file. + */ +public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle { + + private static final long serialVersionUID = 1L; + + /** Id of the checkpoint that created this state handle. */ + @Nonnegative + private final long checkpointId; + + /** UUID to identify the backend which created this state handle. */ + @Nonnull + private final UUID backendIdentifier; + + /** Handle to Flink's state meta data. */ + @Nonnull + private final StreamStateHandle metaDataState; + + /** Set with the ids of all shared state handles created by the checkpoint. */ + @Nonnull + private final Set<StateHandleID> sharedStateHandleIDs; + + public IncrementalLocalKeyedStateHandle( + @Nonnull UUID backendIdentifier, + @Nonnegative long checkpointId, + @Nonnull DirectoryStateHandle directoryStateHandle, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnull StreamStateHandle metaDataState, + @Nonnull Set<StateHandleID> sharedStateHandleIDs) { + + super(directoryStateHandle, keyGroupRange); + this.backendIdentifier = backendIdentifier; + this.checkpointId = checkpointId; + this.metaDataState = metaDataState; + this.sharedStateHandleIDs = sharedStateHandleIDs; + } + + @Nonnull + public StreamStateHandle getMetaDataState() { + return metaDataState; + } + + public long getCheckpointId() { + return checkpointId; + } + + @Nonnull + public UUID getBackendIdentifier() { + return backendIdentifier; + } + + @Nonnull + public Set<StateHandleID> getSharedStateHandleIDs() { + return sharedStateHandleIDs; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + IncrementalLocalKeyedStateHandle that = (IncrementalLocalKeyedStateHandle) o; + + return getMetaDataState().equals(that.getMetaDataState()); + } + + @Override + public void discardState() throws Exception { + + Exception collectedEx = null; + + try { + super.discardState(); + } catch (Exception e) { + collectedEx = e; + } + + try { + metaDataState.discardState(); + } catch (Exception e) { + collectedEx = ExceptionUtils.firstOrSuppressed(e, collectedEx); + } + + if (collectedEx != null) { + throw collectedEx; + } + } + + @Override + public long getStateSize() { + return super.getStateSize() + metaDataState.getStateSize(); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + getMetaDataState().hashCode(); + return result; + } + + @Override + public String toString() { + return "IncrementalLocalKeyedStateHandle{" + + "metaDataState=" + metaDataState + + "} " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index cbe40ee..3326a81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.util.Disposable; import java.util.stream.Stream; @@ -30,7 +31,7 @@ import java.util.stream.Stream; * * @param <K> The key by which state is keyed. */ -public interface KeyedStateBackend<K> extends InternalKeyContext<K> { +public interface KeyedStateBackend<K> extends InternalKeyContext<K>, Disposable { /** * Sets the current key that is used for partitioned state. @@ -102,8 +103,6 @@ public interface KeyedStateBackend<K> extends InternalKeyContext<K> { TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception; - /** - * Closes the backend and releases all resources. - */ + @Override void dispose(); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java new file mode 100644 index 0000000..c97fa0b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +/** + * This class encapsulates the completed configuration for local recovery, i.e. the root + * directories into which all file-based snapshots can be written and the general mode for the local recover feature. + */ +public class LocalRecoveryConfig { + + /** + * Enum over modes of local recovery: + * <p><ul> + * <li>DISABLED: disables local recovery. + * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is based on local files. + * </ul> + */ + public enum LocalRecoveryMode { + DISABLED, + ENABLE_FILE_BASED; + + /** + * Extracts the {@link LocalRecoveryMode} from the given configuration. Defaults to LocalRecoveryMode.DISABLED + * if no configuration value is specified or parsing the value resulted in an exception. + * + * @param configuration the configuration that specifies the value for the local recovery mode. + * @return the local recovery mode as found in the config, or LocalRecoveryMode.DISABLED if no mode was + * configured or the specified mode could not be parsed. + */ + @Nonnull + public static LocalRecoveryMode fromConfig(@Nonnull Configuration configuration) { + String localRecoveryConfString = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY); + try { + return LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString); + } catch (IllegalArgumentException ex) { + LoggerFactory.getLogger(LocalRecoveryConfig.class).warn( + "Exception while parsing configuration of local recovery mode. Local recovery will be disabled.", + ex); + return LocalRecoveryConfig.LocalRecoveryMode.DISABLED; + } + } + } + + /** The local recovery mode. */ + @Nonnull + private final LocalRecoveryMode localRecoveryMode; + + /** Encapsulates the root directories and the subtask-specific path. */ + @Nonnull + private final LocalRecoveryDirectoryProvider localStateDirectories; + + public LocalRecoveryConfig( + @Nonnull LocalRecoveryMode localRecoveryMode, + @Nonnull LocalRecoveryDirectoryProvider directoryProvider) { + this.localRecoveryMode = localRecoveryMode; + this.localStateDirectories = directoryProvider; + } + + @Nonnull + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + @Nonnull + public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() { + return localStateDirectories; + } + + @Override + public String toString() { + return "LocalRecoveryConfig{" + + "localRecoveryMode=" + localRecoveryMode + + ", localStateDirectories=" + localStateDirectories + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java new file mode 100644 index 0000000..3f7ab5c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.File; +import java.io.Serializable; + +/** + * Provides directories for local recovery. It offers access to the allocation base directories (i.e. the root + * directories for all local state that is created under the same allocation id) and the subtask-specific paths, which + * contain the local state for one subtask. Access by checkpoint id rotates over all root directory indexes, in case + * that there is more than one. Selection methods are provided to pick the directory under a certain index. Directory + * structures are of the following shape: + * + * <p><blockquote><pre> + * |-----allocationBaseDirectory------| + * |-----subtaskBaseDirectory--------------------------------------| + * |-----subtaskSpecificCheckpointDirectory------------------------------| + * + * ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state) + * ../local_state_root_2/allocation_id/job_id/vertex_id_subtask_idx/chk_2/(state) + * + * (...) + * </pre></blockquote><p> + */ +public interface LocalRecoveryDirectoryProvider extends Serializable { + + /** + * Returns the local state allocation base directory for given checkpoint id w.r.t. our rotation + * over all available allocation base directories. + */ + File allocationBaseDirectory(long checkpointId); + + /** + * Returns the local state directory for the owning subtask the given checkpoint id w.r.t. our rotation over all + * available available allocation base directories. This directory is contained in the directory returned by + * {@link #allocationBaseDirectory(long)} for the same checkpoint id. + */ + File subtaskBaseDirectory(long checkpointId); + + /** + * Returns the local state directory for the specific operator subtask and the given checkpoint id w.r.t. our + * rotation over all available root dirs. This directory is contained in the directory returned by + * {@link #subtaskBaseDirectory(long)} for the same checkpoint id. + */ + File subtaskSpecificCheckpointDirectory(long checkpointId); + + /** + * Returns a specific allocation base directory. The index must be between 0 (incl.) and + * {@link #allocationBaseDirsCount()} (excl.). + */ + File selectAllocationBaseDirectory(int idx); + + /** + * Returns a specific subtask base directory. The index must be between 0 (incl.) and + * {@link #allocationBaseDirsCount()} (excl.). This directory is direct a child of + * {@link #selectSubtaskBaseDirectory(int)} given the same index. + */ + File selectSubtaskBaseDirectory(int idx); + + /** + * Returns the total number of allocation base directories. + */ + int allocationBaseDirsCount(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java new file mode 100644 index 0000000..ef50929 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.File; +import java.nio.file.Paths; +import java.util.Arrays; + +/** + * Implementation of {@link LocalRecoveryDirectoryProvider}. + */ +public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirectoryProvider { + + /** Serial version. */ + private static final long serialVersionUID = 1L; + + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class); + + /** All available root directories that this can potentially deliver. */ + @Nonnull + private final File[] allocationBaseDirs; + + /** JobID of the owning job. */ + @Nonnull + private final JobID jobID; + + /** JobVertexID of the owning task. */ + @Nonnull + private final JobVertexID jobVertexID; + + /** Index of the owning subtask. */ + @Nonnegative + private final int subtaskIndex; + + public LocalRecoveryDirectoryProviderImpl( + File allocationBaseDir, + @Nonnull JobID jobID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex) { + this(new File[]{allocationBaseDir}, jobID, jobVertexID, subtaskIndex); + } + + public LocalRecoveryDirectoryProviderImpl( + @Nonnull File[] allocationBaseDirs, + @Nonnull JobID jobID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex) { + + Preconditions.checkArgument(allocationBaseDirs.length > 0); + this.allocationBaseDirs = allocationBaseDirs; + this.jobID = jobID; + this.jobVertexID = jobVertexID; + this.subtaskIndex = subtaskIndex; + + for (File allocationBaseDir : allocationBaseDirs) { + Preconditions.checkNotNull(allocationBaseDir); + allocationBaseDir.mkdirs(); + } + } + + @Override + public File allocationBaseDirectory(long checkpointId) { + return selectAllocationBaseDirectory((((int) checkpointId) & Integer.MAX_VALUE) % allocationBaseDirs.length); + } + + @Override + public File subtaskBaseDirectory(long checkpointId) { + return new File(allocationBaseDirectory(checkpointId), subtaskDirString()); + } + + @Override + public File subtaskSpecificCheckpointDirectory(long checkpointId) { + return new File(subtaskBaseDirectory(checkpointId), checkpointDirString(checkpointId)); + } + + @Override + public File selectAllocationBaseDirectory(int idx) { + return allocationBaseDirs[idx]; + } + + @Override + public File selectSubtaskBaseDirectory(int idx) { + return new File(selectAllocationBaseDirectory(idx), subtaskDirString()); + } + + @Override + public int allocationBaseDirsCount() { + return allocationBaseDirs.length; + } + + @Override + public String toString() { + return "LocalRecoveryDirectoryProvider{" + + "rootDirectories=" + Arrays.toString(allocationBaseDirs) + + ", jobID=" + jobID + + ", jobVertexID=" + jobVertexID + + ", subtaskIndex=" + subtaskIndex + + '}'; + } + + @VisibleForTesting + String subtaskDirString() { + return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + subtaskIndex).toString(); + } + + @VisibleForTesting + String checkpointDirString(long checkpointId) { + return "chk_" + checkpointId; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java deleted file mode 100644 index 1960c1c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.core.fs.AbstractMultiFSDataInputStream; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -/** - * Wrapper class that takes multiple {@link StreamStateHandle} and makes them look like a single one. This is done by - * providing a contiguous view on all the streams of the inner handles through a wrapper stream and by summing up all - * all the meta data. - */ -public class MultiStreamStateHandle implements StreamStateHandle { - - private static final long serialVersionUID = -4588701089489569707L; - private final List<StreamStateHandle> stateHandles; - private final long stateSize; - - public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) { - this.stateHandles = Preconditions.checkNotNull(stateHandles); - long calculateSize = 0L; - for(StreamStateHandle stateHandle : stateHandles) { - calculateSize += stateHandle.getStateSize(); - } - this.stateSize = calculateSize; - } - - @Override - public FSDataInputStream openInputStream() throws IOException { - return new MultiFSDataInputStream(stateHandles); - } - - @Override - public void discardState() throws Exception { - StateUtil.bestEffortDiscardAllStateObjects(stateHandles); - } - - @Override - public long getStateSize() { - return stateSize; - } - - @Override - public String toString() { - return "MultiStreamStateHandle{" + - "stateHandles=" + stateHandles + - ", stateSize=" + stateSize + - '}'; - } - - static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream { - - private final TreeMap<Long, StreamStateHandle> stateHandleMap; - - public MultiFSDataInputStream(List<StreamStateHandle> stateHandles) throws IOException { - this.stateHandleMap = new TreeMap<>(); - this.totalPos = 0L; - long calculateSize = 0L; - for (StreamStateHandle stateHandle : stateHandles) { - stateHandleMap.put(calculateSize, stateHandle); - calculateSize += stateHandle.getStateSize(); - } - this.totalAvailable = calculateSize; - - if (totalAvailable > 0L) { - StreamStateHandle first = stateHandleMap.firstEntry().getValue(); - delegate = first.openInputStream(); - } - } - - @Override - protected FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException { - Map.Entry<Long, StreamStateHandle> handleEntry = stateHandleMap.floorEntry(globalStreamOffset); - if (handleEntry != null) { - FSDataInputStream stream = handleEntry.getValue().openInputStream(); - stream.seek(globalStreamOffset - handleEntry.getKey()); - return stream; - } - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index aee5226..3cbb351 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -19,19 +19,22 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.util.Disposable; import java.io.Closeable; +import java.util.Collection; /** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface * {@link Snapshotable} * */ -public interface OperatorStateBackend extends OperatorStateStore, Snapshotable<OperatorStateHandle>, Closeable { +public interface OperatorStateBackend extends + OperatorStateStore, + Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>, + Closeable, + Disposable { - /** - * Disposes the backend and releases all resources. - */ + @Override void dispose(); - } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java index 036aed0..ba28631 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java @@ -71,14 +71,14 @@ public final class OperatorStateCheckpointOutputStream OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo( partitionOffsets.toArray(), - OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); + OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, metaInfo); - return new OperatorStateHandle(offsetsMap, streamStateHandle); + return new OperatorStreamStateHandle(offsetsMap, streamStateHandle); } public int getNumberOfPartitions() { return partitionOffsets.size(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java index f9427ef..1ebdaff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java @@ -27,107 +27,39 @@ import java.util.Arrays; import java.util.Map; /** - * State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a - * map that contains the offsets to the partitions of named states in the stream. + * Interface of a state handle for operator state. */ -public class OperatorStateHandle implements StreamStateHandle { +public interface OperatorStateHandle extends StreamStateHandle { /** - * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. + * Returns a map of meta data for all contained states by their name. */ - public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - UNION, // The operator state partitions are UNION-ed upon restoring and sent to all tasks. - BROADCAST // The operator states are identical, as the state is produced from a broadcast stream. - } - - private static final long serialVersionUID = 35876522969227335L; + Map<String, StateMetaInfo> getStateNameToPartitionOffsets(); /** - * unique state name -> offsets for available partitions in the handle stream + * Returns an input stream to read the operator state information. */ - private final Map<String, StateMetaInfo> stateNameToPartitionOffsets; - private final StreamStateHandle delegateStateHandle; - - public OperatorStateHandle( - Map<String, StateMetaInfo> stateNameToPartitionOffsets, - StreamStateHandle delegateStateHandle) { - - this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle); - this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets); - } - - public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() { - return stateNameToPartitionOffsets; - } - - @Override - public void discardState() throws Exception { - delegateStateHandle.discardState(); - } - - @Override - public long getStateSize() { - return delegateStateHandle.getStateSize(); - } - @Override - public FSDataInputStream openInputStream() throws IOException { - return delegateStateHandle.openInputStream(); - } - - public StreamStateHandle getDelegateStateHandle() { - return delegateStateHandle; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (!(o instanceof OperatorStateHandle)) { - return false; - } - - OperatorStateHandle that = (OperatorStateHandle) o; + FSDataInputStream openInputStream() throws IOException; - if (stateNameToPartitionOffsets.size() != that.stateNameToPartitionOffsets.size()) { - return false; - } - - for (Map.Entry<String, StateMetaInfo> entry : stateNameToPartitionOffsets.entrySet()) { - if (!entry.getValue().equals(that.stateNameToPartitionOffsets.get(entry.getKey()))) { - return false; - } - } - - return delegateStateHandle.equals(that.delegateStateHandle); - } - - @Override - public int hashCode() { - int result = delegateStateHandle.hashCode(); - for (Map.Entry<String, StateMetaInfo> entry : stateNameToPartitionOffsets.entrySet()) { - - int entryHash = entry.getKey().hashCode(); - if (entry.getValue() != null) { - entryHash += entry.getValue().hashCode(); - } - result = 31 * result + entryHash; - } - return result; - } + /** + * Returns the underlying stream state handle that points to the state data. + */ + StreamStateHandle getDelegateStateHandle(); - @Override - public String toString() { - return "OperatorStateHandle{" + - "stateNameToPartitionOffsets=" + stateNameToPartitionOffsets + - ", delegateStateHandle=" + delegateStateHandle + - '}'; + /** + * The modes that determine how an {@link OperatorStreamStateHandle} is assigned to tasks during restore. + */ + enum Mode { + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + UNION, // The operator state partitions are UNION-ed upon restoring and sent to all tasks. + BROADCAST // The operator states are identical, as the state is produced from a broadcast stream. } - public static class StateMetaInfo implements Serializable { + /** + * Meta information about the operator state handle. + */ + class StateMetaInfo implements Serializable { private static final long serialVersionUID = 3593817615858941166L; @@ -158,10 +90,8 @@ public class OperatorStateHandle implements StreamStateHandle { StateMetaInfo that = (StateMetaInfo) o; - if (!Arrays.equals(getOffsets(), that.getOffsets())) { - return false; - } - return getDistributionMode() == that.getDistributionMode(); + return Arrays.equals(getOffsets(), that.getOffsets()) + && getDistributionMode() == that.getDistributionMode(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java new file mode 100644 index 0000000..3900834 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a + * map that contains the offsets to the partitions of named states in the stream. + */ +public class OperatorStreamStateHandle implements OperatorStateHandle { + + private static final long serialVersionUID = 35876522969227335L; + + /** + * unique state name -> offsets for available partitions in the handle stream + */ + private final Map<String, StateMetaInfo> stateNameToPartitionOffsets; + private final StreamStateHandle delegateStateHandle; + + public OperatorStreamStateHandle( + Map<String, StateMetaInfo> stateNameToPartitionOffsets, + StreamStateHandle delegateStateHandle) { + + this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle); + this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets); + } + + @Override + public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() { + return stateNameToPartitionOffsets; + } + + @Override + public void discardState() throws Exception { + delegateStateHandle.discardState(); + } + + @Override + public long getStateSize() { + return delegateStateHandle.getStateSize(); + } + + @Override + public FSDataInputStream openInputStream() throws IOException { + return delegateStateHandle.openInputStream(); + } + + @Override + public StreamStateHandle getDelegateStateHandle() { + return delegateStateHandle; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof OperatorStreamStateHandle)) { + return false; + } + + OperatorStreamStateHandle that = (OperatorStreamStateHandle) o; + + if (stateNameToPartitionOffsets.size() != that.stateNameToPartitionOffsets.size()) { + return false; + } + + for (Map.Entry<String, StateMetaInfo> entry : stateNameToPartitionOffsets.entrySet()) { + if (!entry.getValue().equals(that.stateNameToPartitionOffsets.get(entry.getKey()))) { + return false; + } + } + + return delegateStateHandle.equals(that.delegateStateHandle); + } + + @Override + public int hashCode() { + int result = delegateStateHandle.hashCode(); + for (Map.Entry<String, StateMetaInfo> entry : stateNameToPartitionOffsets.entrySet()) { + + int entryHash = entry.getKey().hashCode(); + if (entry.getValue() != null) { + entryHash += entry.getValue().hashCode(); + } + result = 31 * result + entryHash; + } + return result; + } + + @Override + public String toString() { + return "OperatorStateHandle{" + + "stateNameToPartitionOffsets=" + stateNameToPartitionOffsets + + ", delegateStateHandle=" + delegateStateHandle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java new file mode 100644 index 0000000..3c0054f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class represents a directory that is the target for a state snapshot. This class provides some method that + * simplify resource management when dealing with such directories, e.g. it can produce a {@link DirectoryStateHandle} + * when the snapshot is completed and disposal considers whether or not a snapshot was already completed. For a + * completed snapshot, the ownership for cleanup is transferred to the created directory state handle. For incomplete + * snapshots, calling {@link #cleanup()} will delete the underlying directory resource. + */ +public abstract class SnapshotDirectory { + + /** + * Lifecycle stages of a snapshot directory. + */ + enum State { + ONGOING, COMPLETED, DELETED + } + + /** This path describes the underlying directory for the snapshot. */ + @Nonnull + protected final Path directory; + + /** The filesystem that contains the snapshot directory. */ + @Nonnull + protected final FileSystem fileSystem; + + /** This reference tracks the lifecycle state of the snapshot directory. */ + @Nonnull + protected AtomicReference<State> state; + + private SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem fileSystem) { + this.directory = directory; + this.fileSystem = fileSystem; + this.state = new AtomicReference<>(State.ONGOING); + } + + private SnapshotDirectory(@Nonnull Path directory) throws IOException { + this(directory, directory.getFileSystem()); + } + + @Nonnull + public Path getDirectory() { + return directory; + } + + public boolean mkdirs() throws IOException { + return fileSystem.mkdirs(directory); + } + + @Nonnull + public FileSystem getFileSystem() { + return fileSystem; + } + + public boolean exists() throws IOException { + return fileSystem.exists(directory); + } + + /** + * List the statuses of the files/directories in the snapshot directory. + * + * @return the statuses of the files/directories in the given path. + * @throws IOException if there is a problem creating the file statuses. + */ + public FileStatus[] listStatus() throws IOException { + return fileSystem.listStatus(directory); + } + + /** + * Calling this method will attempt delete the underlying snapshot directory recursively, if the state is + * "ongoing". In this case, the state will be set to "deleted" as a result of this call. + * + * @return <code>true</code> if delete is successful, <code>false</code> otherwise. + * @throws IOException if an exception happens during the delete. + */ + public boolean cleanup() throws IOException { + return !state.compareAndSet(State.ONGOING, State.DELETED) || fileSystem.delete(directory, true); + } + + /** + * Returns <code>true</code> if the snapshot is marked as completed. + */ + public boolean isSnapshotCompleted() { + return State.COMPLETED == state.get(); + } + + /** + * Calling this method completes the snapshot for this snapshot directory, if possible, and creates a corresponding + * {@link DirectoryStateHandle} that points to the snapshot directory. Calling this method can change the + * lifecycle state from ONGOING to COMPLETED if the directory should no longer deleted in {@link #cleanup()}. This + * method can return Can return <code>true</code> if the directory is temporary and should therefore not be + * referenced in a handle. + * + * @return A directory state handle that points to the snapshot directory. Can return <code>true</code> if the + * directory is temporary and should therefore not be referenced in a handle. + * @throws IOException if the state of this snapshot directory object is different from "ongoing". + */ + @Nullable + public abstract DirectoryStateHandle completeSnapshotAndGetHandle() throws IOException; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SnapshotDirectory that = (SnapshotDirectory) o; + + return directory.equals(that.directory); + } + + @Override + public int hashCode() { + return directory.hashCode(); + } + + @Override + public String toString() { + return "SnapshotDirectory{" + + "directory=" + directory + + ", state=" + state + + '}'; + } + + /** + * Creates a temporary snapshot directory for the given path. This will always return "null" as result of + * {@link #completeSnapshotAndGetHandle()} and always attempt to delete the underlying directory in + * {@link #cleanup()}. + */ + public static SnapshotDirectory temporary(@Nonnull Path directory) throws IOException { + return new TemporarySnapshotDirectory(directory); + } + + /** + * Creates a permanent snapshot directory for the given path, which will not delete the underlying directory in + * {@link #cleanup()} after {@link #completeSnapshotAndGetHandle()} was called. + */ + public static SnapshotDirectory permanent(@Nonnull Path directory) throws IOException { + return new PermanentSnapshotDirectory(directory); + } + + private static class TemporarySnapshotDirectory extends SnapshotDirectory { + + TemporarySnapshotDirectory(@Nonnull Path directory) throws IOException { + super(directory); + } + + @Override + public DirectoryStateHandle completeSnapshotAndGetHandle() { + return null; // We return null so that directory it is not referenced by a state handle. + } + } + + private static class PermanentSnapshotDirectory extends SnapshotDirectory { + + PermanentSnapshotDirectory(@Nonnull Path directory) throws IOException { + super(directory); + } + + @Override + public DirectoryStateHandle completeSnapshotAndGetHandle() throws IOException { + if (State.COMPLETED == state.get() || state.compareAndSet(State.ONGOING, State.COMPLETED)) { + return new DirectoryStateHandle(directory); + } else { + throw new IOException("Expected state " + State.ONGOING + " but found state " + state.get()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java new file mode 100644 index 0000000..e292c73 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * This class contains the combined results from the snapshot of a state backend: + * <ul> + * <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li> + * <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li> + * </ul> + * + * Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local + * state object that is not null also requires a state to report to the job manager that is not null, because the + * Job Manager always owns the ground truth about the checkpointed state. + */ +public class SnapshotResult<T extends StateObject> implements StateObject { + + private static final long serialVersionUID = 1L; + + /** An singleton instance to represent an empty snapshot result. */ + private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null); + + /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */ + private final T jobManagerOwnedSnapshot; + + /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */ + private final T taskLocalSnapshot; + + /** + * Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the + * jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null. + * + * @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null. + * @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires + * jobManagerOwnedSnapshot to be not null if this is not also null. + */ + private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) { + + if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) { + throw new IllegalStateException("Cannot report local state snapshot without corresponding remote state!"); + } + + this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot; + this.taskLocalSnapshot = taskLocalSnapshot; + } + + public T getJobManagerOwnedSnapshot() { + return jobManagerOwnedSnapshot; + } + + public T getTaskLocalSnapshot() { + return taskLocalSnapshot; + } + + @Override + public void discardState() throws Exception { + + Exception aggregatedExceptions = null; + + if (jobManagerOwnedSnapshot != null) { + try { + jobManagerOwnedSnapshot.discardState(); + } catch (Exception remoteDiscardEx) { + aggregatedExceptions = remoteDiscardEx; + } + } + + if (taskLocalSnapshot != null) { + try { + taskLocalSnapshot.discardState(); + } catch (Exception localDiscardEx) { + aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions); + } + } + + if (aggregatedExceptions != null) { + throw aggregatedExceptions; + } + } + + @Override + public long getStateSize() { + return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L; + } + + @SuppressWarnings("unchecked") + public static <T extends StateObject> SnapshotResult<T> empty() { + return (SnapshotResult<T>) EMPTY; + } + + public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) { + return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty(); + } + + public static <T extends StateObject> SnapshotResult<T> withLocalState( + @Nonnull T jobManagerState, + @Nonnull T localState) { + return new SnapshotResult<>(jobManagerState, localState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java new file mode 100644 index 0000000..9139fa7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions; + +import java.util.concurrent.RunnableFuture; + +/** + * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at + * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints. + * + * @param <S> type of the returned state object that represents the result of the snapshot operation. + */ +@FunctionalInterface +public interface SnapshotStrategy<S extends StateObject> { + + /** + * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and + * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if + * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed + * first before obtaining the handle. + * + * @param checkpointId The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param streamFactory The factory that we can use for writing our state to streams. + * @param checkpointOptions Options for how to perform this checkpoint. + * @return A runnable future that will yield a {@link StateObject}. + */ + RunnableFuture<S> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java index c7e62f0..733339f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java @@ -20,15 +20,15 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import java.util.Collection; import java.util.concurrent.RunnableFuture; /** * Interface for operators that can perform snapshots of their state. * * @param <S> Generic type of the state object that is created as handle to snapshots. + * @param <R> Generic type of the state object that used in restore. */ -public interface Snapshotable<S extends StateObject> { +public interface Snapshotable<S extends StateObject, R> { /** * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and @@ -54,5 +54,5 @@ public interface Snapshotable<S extends StateObject> { * * @param state the old state to restore. */ - void restore(Collection<S> state) throws Exception; + void restore(R state) throws Exception; }
