This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc1d63579ae5bcd9db207e1a5cd1b6365a87e871 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Tue Jan 18 16:12:42 2022 +0100 [FLINK-25194] Implement an API for duplicating artefacts --- .../flink/core/fs/DuplicatingFileSystem.java | 74 +++++++++++++++ .../org/apache/flink/core/fs/EntropyInjector.java | 21 ++++- .../runtime/state/CheckpointStateToolset.java | 54 +++++++++++ .../runtime/state/CheckpointStorageWorkerView.java | 11 +++ .../runtime/state/CheckpointStreamFactory.java | 28 ++++++ .../NotDuplicatingCheckpointStateToolset.java | 37 ++++++++ .../state/filesystem/FsCheckpointStateToolset.java | 91 ++++++++++++++++++ .../filesystem/FsCheckpointStorageAccess.java | 13 +++ .../filesystem/FsCheckpointStreamFactory.java | 60 +++++++++++- .../state/memory/MemCheckpointStreamFactory.java | 14 ++- .../MemoryBackendCheckpointStorageAccess.java | 7 ++ .../runtime/state/ChannelPersistenceITCase.java | 5 + .../state/TestCheckpointStorageWorkerView.java | 5 + ...tingCheckpointStorageAccessCoordinatorView.java | 5 + .../filesystem/FsCheckpointStateToolsetTest.java | 103 +++++++++++++++++++++ .../filesystem/FsCheckpointStorageAccessTest.java | 42 +++++++++ .../state/testutils/BackendForTestStream.java | 21 +++++ .../testutils/TestCheckpointStreamFactory.java | 14 +++ .../state/ttl/mock/MockCheckpointStorage.java | 23 +++++ .../util/BlockerCheckpointStreamFactory.java | 13 +++ .../state/NonCheckpointingStorageAccess.java | 7 ++ .../tasks/SubtaskCheckpointCoordinatorImpl.java | 6 ++ .../UnalignedCheckpointFailureHandlingITCase.java | 30 +++++- 23 files changed, 675 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java new file mode 100644 index 0000000..3313fbc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.IOException; +import java.util.List; + +/** + * An extension interface for {@link FileSystem FileSystems} that can perform cheap DFS side + * duplicate operation. Such an operation can improve the time required for creating cheaply + * independent snapshots from incremental snapshots. + */ +public interface DuplicatingFileSystem { + /** + * Tells if we can perform duplicate/copy between given paths. + * + * <p>This should be a rather cheap operation, preferably not involving any remote accesses. You + * can check e.g. if both paths are on the same host. + * + * @param source The path of the source file to duplicate + * @param destination The path where to duplicate the source file + * @return true, if we can perform the duplication + */ + boolean canFastDuplicate(Path source, Path destination) throws IOException; + + /** + * Duplicates the source path into the destination path. + * + * <p>You should first check if you can duplicate with {@link #canFastDuplicate(Path, Path)}. + * + * @param requests Pairs of src/dst to copy. + */ + void duplicate(List<CopyRequest> requests) throws IOException; + + /** A pair of source and destination to duplicate a file. */ + interface CopyRequest { + /** The path of the source file to duplicate. */ + Path getSource(); + + /** The path where to duplicate the source file. */ + Path getDestination(); + + /** A factory method for creating a simple pair of source/destination. */ + static CopyRequest of(Path source, Path destination) { + return new CopyRequest() { + @Override + public Path getSource() { + return source; + } + + @Override + public Path getDestination() { + return destination; + } + }; + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java index d8e22d2..043d81c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java @@ -46,15 +46,30 @@ public class EntropyInjector { * EntropyInjectingFileSystem#getEntropyInjectionKey()}. * * <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this + * method returns the same path. + */ + public static Path addEntropy(FileSystem fs, Path path) throws IOException { + // check and possibly inject entropy into the path + final EntropyInjectingFileSystem efs = getEntropyFs(fs); + return efs == null ? path : resolveEntropy(path, efs, true); + } + + /** + * Handles entropy injection across regular and entropy-aware file systems. + * + * <p>If the given file system is entropy-aware (a implements {@link + * EntropyInjectingFileSystem}), then this method replaces the entropy marker in the path with + * random characters. The entropy marker is defined by {@link + * EntropyInjectingFileSystem#getEntropyInjectionKey()}. + * + * <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this * method delegates to {@link FileSystem#create(Path, WriteMode)} and returns the same path in * the resulting {@code OutputStreamAndPath}. */ public static OutputStreamAndPath createEntropyAware( FileSystem fs, Path path, WriteMode writeMode) throws IOException { - // check and possibly inject entropy into the path - final EntropyInjectingFileSystem efs = getEntropyFs(fs); - final Path processedPath = efs == null ? path : resolveEntropy(path, efs, true); + final Path processedPath = addEntropy(fs, path); // create the stream on the original file system to let the safety net // take its effect diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java new file mode 100644 index 0000000..eb21b9c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.util.List; + +/** + * A toolset of operations that can be performed on a location embedded within the class. Created in + * {@link CheckpointStorageWorkerView}. + */ +@Internal +public interface CheckpointStateToolset { + + /** + * Tells if we can duplicate the given {@link StreamStateHandle}. + * + * <p>This should be a rather cheap operation, preferably not involving any remote accesses. + * + * @param stateHandle The handle to duplicate + * @return true, if we can perform the duplication + */ + boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException; + + /** + * Duplicates {@link StreamStateHandle StreamStateHandles} into the path embedded inside of the + * class. + * + * <p>You should first check if you can duplicate with {@link + * #canFastDuplicate(StreamStateHandle)}. + * + * @param stateHandle The handles to duplicate + * @return The duplicated handles + */ + List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java index 39d4a74..3e7e659 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.Internal; + import java.io.IOException; /** @@ -27,6 +29,7 @@ import java.io.IOException; * * <p>Methods of this interface act as a worker role in task manager. */ +@Internal public interface CheckpointStorageWorkerView { /** @@ -64,4 +67,12 @@ public interface CheckpointStorageWorkerView { * @throws IOException Thrown, if the stream cannot be opened. */ CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; + + /** + * A complementary method to {@link #createTaskOwnedStateStream()}. Creates a toolset that gives + * access to additional operations that can be performed in the task owned state location. + * + * @return A toolset for additional operations for state owned by tasks. + */ + CheckpointStateToolset createTaskOwnedCheckpointStateToolset(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index c905438..96c9599 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import java.io.IOException; +import java.util.List; /** * A factory for checkpoint output streams, which are used to persist data for checkpoints. @@ -39,4 +40,31 @@ public interface CheckpointStreamFactory { */ CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; + + /** + * Tells if we can duplicate the given {@link StreamStateHandle} into the path corresponding to + * the given {@link CheckpointedStateScope}. + * + * <p>This should be a rather cheap operation, preferably not involving any remote accesses. + * + * @param stateHandle The handle to duplicate + * @param scope Scope determining the location to duplicate into + * @return true, if we can perform the duplication + */ + boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) + throws IOException; + + /** + * Duplicates {@link StreamStateHandle} into the path corresponding to * the given {@link + * CheckpointedStateScope}. + * + * <p>You should first check if you can duplicate with {@link + * #canFastDuplicate(StreamStateHandle, CheckpointedStateScope)}. + * + * @param stateHandles The handles to duplicate + * @param scope Scope determining the location to duplicate into + * @return The duplicated handle + */ + List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java new file mode 100644 index 0000000..354c821 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.List; + +/** An empty implementation of {@link CheckpointStateToolset}. */ +public final class NotDuplicatingCheckpointStateToolset implements CheckpointStateToolset { + @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException { + return false; + } + + @Override + public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle) + throws IOException { + throw new UnsupportedEncodingException("The toolset does not support duplication"); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java new file mode 100644 index 0000000..1f8423c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.DuplicatingFileSystem; +import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest; +import org.apache.flink.core.fs.EntropyInjector; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStateToolset; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * An implementation of {@link CheckpointStateToolset} that does file based duplicating with as + * {@link DuplicatingFileSystem}. + */ +public class FsCheckpointStateToolset implements CheckpointStateToolset { + + private final Path basePath; + private final DuplicatingFileSystem fs; + + public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) { + this.basePath = basePath; + this.fs = fs; + } + + @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException { + if (!(stateHandle instanceof FileStateHandle)) { + return false; + } + final Path srcPath = ((FileStateHandle) stateHandle).getFilePath(); + final Path dst = getNewDstPath(srcPath.getName()); + return fs.canFastDuplicate(srcPath, dst); + } + + @Override + public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles) + throws IOException { + + final List<CopyRequest> requests = new ArrayList<>(); + for (StreamStateHandle handle : stateHandles) { + if (!(handle instanceof FileStateHandle)) { + throw new IllegalArgumentException("We can duplicate only FileStateHandles."); + } + final Path srcPath = ((FileStateHandle) handle).getFilePath(); + requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName()))); + } + fs.duplicate(requests); + + return IntStream.range(0, stateHandles.size()) + .mapToObj( + idx -> { + final StreamStateHandle originalHandle = stateHandles.get(idx); + final Path dst = requests.get(idx).getDestination(); + if (originalHandle instanceof RelativeFileStateHandle) { + return new RelativeFileStateHandle( + dst, dst.getName(), originalHandle.getStateSize()); + } else { + return new FileStateHandle(dst, originalHandle.getStateSize()); + } + }) + .collect(Collectors.toList()); + } + + private Path getNewDstPath(String fileName) throws IOException { + final Path dst = new Path(basePath, fileName); + return EntropyInjector.addEntropy(dst.getFileSystem(), dst); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java index 1c3ebb2..af37324 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java @@ -20,12 +20,15 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.DuplicatingFileSystem; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; import javax.annotation.Nullable; @@ -177,6 +180,16 @@ public class FsCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess } @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + if (fileSystem instanceof DuplicatingFileSystem) { + return new FsCheckpointStateToolset( + taskOwnedStateDirectory, (DuplicatingFileSystem) fileSystem); + } else { + return new NotDuplicatingCheckpointStateToolset(); + } + } + + @Override protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) { final CheckpointStorageLocationReference reference = encodePathAsReference(location); return new FsCheckpointStorageLocation( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 5151130..8774848 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.core.fs.DuplicatingFileSystem; import org.apache.flink.core.fs.EntropyInjector; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; @@ -37,6 +38,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -87,6 +89,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { /** Whether the file system dynamically injects entropy into the file paths. */ private final boolean entropyInjecting; + private final FsCheckpointStateToolset privateStateToolset; + + private final FsCheckpointStateToolset sharedStateToolset; + /** * Creates a new stream factory that stores its checkpoint data in the file system and location * defined by the given Path. @@ -130,6 +136,16 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { this.fileStateThreshold = fileStateSizeThreshold; this.writeBufferSize = writeBufferSize; this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem); + if (fileSystem instanceof DuplicatingFileSystem) { + final DuplicatingFileSystem duplicatingFileSystem = (DuplicatingFileSystem) fileSystem; + this.privateStateToolset = + new FsCheckpointStateToolset(checkpointDirectory, duplicatingFileSystem); + this.sharedStateToolset = + new FsCheckpointStateToolset(sharedStateDirectory, duplicatingFileSystem); + } else { + this.privateStateToolset = null; + this.sharedStateToolset = null; + } } // ------------------------------------------------------------------------ @@ -137,10 +153,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { @Override public FsCheckpointStateOutputStream createCheckpointStateOutputStream( CheckpointedStateScope scope) throws IOException { - Path target = - scope == CheckpointedStateScope.EXCLUSIVE - ? checkpointDirectory - : sharedStateDirectory; + Path target = getTargetPath(scope); int bufferSize = Math.max(writeBufferSize, fileStateThreshold); final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED; @@ -148,6 +161,45 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { target, filesystem, bufferSize, fileStateThreshold, !absolutePath); } + private Path getTargetPath(CheckpointedStateScope scope) { + return scope == CheckpointedStateScope.EXCLUSIVE + ? checkpointDirectory + : sharedStateDirectory; + } + + @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) + throws IOException { + if (privateStateToolset == null || sharedStateToolset == null) { + return false; + } + switch (scope) { + case EXCLUSIVE: + return privateStateToolset.canFastDuplicate(stateHandle); + case SHARED: + return sharedStateToolset.canFastDuplicate(stateHandle); + } + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException { + + if (privateStateToolset == null || sharedStateToolset == null) { + throw new IllegalArgumentException("The underlying FS does not support duplication."); + } + + switch (scope) { + case EXCLUSIVE: + return privateStateToolset.duplicate(stateHandles); + case SHARED: + return sharedStateToolset.duplicate(stateHandles); + default: + throw new IllegalArgumentException("Unknown state scope: " + scope); + } + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 98d09be..c9b5088 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -28,13 +28,14 @@ import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import javax.annotation.Nullable; import java.io.IOException; +import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; /** {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { - /** The maximal size that the snapshotted memory state may have */ + /** The maximal size that the snapshotted memory state may have. */ private final int maxStateSize; /** @@ -54,6 +55,17 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { } @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException { + throw new UnsupportedOperationException("We can not duplicate handles in memory."); + } + + @Override public String toString() { return "In-Memory Stream Factory"; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java index 10e33a9..46ae6c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java @@ -23,9 +23,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream; @@ -157,6 +159,11 @@ public class MemoryBackendCheckpointStorageAccess extends AbstractFsCheckpointSt } @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return new NotDuplicatingCheckpointStateToolset(); + } + + @Override protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) { return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java index 8363e2e..f9cc467 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java @@ -271,6 +271,11 @@ public class ChannelPersistenceITCase { public CheckpointStateOutputStream createTaskOwnedStateStream() { throw new UnsupportedOperationException(); } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + throw new UnsupportedOperationException(); + } }; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java index 9b8d298..afe955d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java @@ -53,4 +53,9 @@ public class TestCheckpointStorageWorkerView implements CheckpointStorageWorkerV return taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream( taskOwnedStateScope); } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return new NotDuplicatingCheckpointStateToolset(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java index 9e05949..9213a51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java @@ -97,6 +97,11 @@ public class TestingCheckpointStorageAccessCoordinatorView return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE); } + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return new NotDuplicatingCheckpointStateToolset(); + } + // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java new file mode 100644 index 0000000..d9dafe5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.DuplicatingFileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FsCheckpointStateToolset}. */ +class FsCheckpointStateToolsetTest { + @Test + void testCanDuplicateNonFileStreamHandle() throws IOException { + final FsCheckpointStateToolset stateToolset = + new FsCheckpointStateToolset( + new Path("test-path"), new TestDuplicatingFileSystem()); + + final boolean canFastDuplicate = + stateToolset.canFastDuplicate(new ByteStreamStateHandle("test", new byte[] {})); + assertThat(canFastDuplicate).isFalse(); + } + + @Test + void testCanDuplicate() throws IOException { + final FsCheckpointStateToolset stateToolset = + new FsCheckpointStateToolset( + new Path("test-path"), new TestDuplicatingFileSystem()); + + final boolean canFastDuplicate = + stateToolset.canFastDuplicate( + new FileStateHandle(new Path("old-test-path", "test-file"), 0)); + assertThat(canFastDuplicate).isTrue(); + } + + @Test + void testCannotDuplicate() throws IOException { + final FsCheckpointStateToolset stateToolset = + new FsCheckpointStateToolset( + new Path("test-path"), new TestDuplicatingFileSystem()); + + final boolean canFastDuplicate = + stateToolset.canFastDuplicate( + new FileStateHandle(new Path("test-path", "test-file"), 0)); + assertThat(canFastDuplicate).isFalse(); + } + + @Test + void testDuplicating() throws IOException { + final TestDuplicatingFileSystem fs = new TestDuplicatingFileSystem(); + final FsCheckpointStateToolset stateToolset = + new FsCheckpointStateToolset(new Path("test-path"), fs); + + final List<StreamStateHandle> duplicated = + stateToolset.duplicate( + Arrays.asList( + new FileStateHandle(new Path("old-test-path", "test-file1"), 0), + new FileStateHandle(new Path("old-test-path", "test-file2"), 0), + new RelativeFileStateHandle( + new Path("old-test-path", "test-file3"), "test-file3", 0))); + + assertThat(duplicated) + .containsExactly( + new FileStateHandle(new Path("test-path", "test-file1"), 0), + new FileStateHandle(new Path("test-path", "test-file2"), 0), + new RelativeFileStateHandle( + new Path("test-path", "test-file3"), "test-file3", 0)); + } + + private static final class TestDuplicatingFileSystem implements DuplicatingFileSystem { + + @Override + public boolean canFastDuplicate(Path source, Path destination) throws IOException { + return !source.equals(destination); + } + + @Override + public void duplicate(List<CopyRequest> requests) throws IOException {} + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java index 7b24f3d..2c2fbdc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.DuplicatingFileSystem; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; @@ -26,19 +27,24 @@ import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; +import org.apache.flink.testutils.TestFileSystem; import org.junit.Test; import javax.annotation.Nonnull; import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Arrays; import java.util.List; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -280,6 +286,42 @@ public class FsCheckpointStorageAccessTest extends AbstractFileCheckpointStorage assertTrue(fileSystem instanceof LocalFileSystem); } + @Test + public void testNotDuplicationCheckpointStateToolset() throws Exception { + CheckpointStorageAccess checkpointStorage = createCheckpointStorage(randomTempPath()); + assertThat( + checkpointStorage.createTaskOwnedCheckpointStateToolset(), + instanceOf(NotDuplicatingCheckpointStateToolset.class)); + } + + @Test + public void testDuplicationCheckpointStateToolset() throws Exception { + CheckpointStorageAccess checkpointStorage = + new FsCheckpointStorageAccess( + new TestDuplicatingFileSystem(), + randomTempPath(), + null, + new JobID(), + FILE_SIZE_THRESHOLD, + WRITE_BUFFER_SIZE); + + assertThat( + checkpointStorage.createTaskOwnedCheckpointStateToolset(), + instanceOf(FsCheckpointStateToolset.class)); + } + + private static final class TestDuplicatingFileSystem extends TestFileSystem + implements DuplicatingFileSystem { + + @Override + public boolean canFastDuplicate(Path source, Path destination) throws IOException { + return !source.equals(destination); + } + + @Override + public void duplicate(List<CopyRequest> requests) throws IOException {} + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java index 0331d5b..5ad7903 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java @@ -21,18 +21,21 @@ package org.apache.flink.runtime.state.testutils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.function.SupplierWithException; import javax.annotation.Nullable; import java.io.IOException; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -114,6 +117,11 @@ public class BackendForTestStream extends MemoryStateBackend { public CheckpointStateOutputStream createTaskOwnedStateStream() { throw new UnsupportedOperationException(); } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + throw new UnsupportedOperationException(); + } } private static final class TestFactory @@ -130,5 +138,18 @@ public class BackendForTestStream extends MemoryStateBackend { CheckpointedStateScope scope) throws IOException { return streamFactory.get(); } + + @Override + public boolean canFastDuplicate( + StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) + throws IOException { + throw new UnsupportedOperationException(); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java index 80f36b2..0f16c73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java @@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.testutils; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; +import java.io.IOException; +import java.util.List; import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -42,4 +45,15 @@ public class TestCheckpointStreamFactory implements CheckpointStreamFactory { CheckpointedStateScope scope) { return supplier.get(); } + + @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java index fb4a8a0..91e78f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.ttl.mock; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -28,9 +29,13 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.StreamStateHandle; import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; + public class MockCheckpointStorage implements CheckpointStorage { @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) { @@ -69,6 +74,19 @@ public class MockCheckpointStorage implements CheckpointStorage { } @Override + public boolean canFastDuplicate( + StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) + throws IOException { + return null; + } + + @Override public CheckpointMetadataOutputStream createMetadataOutputStream() { return null; } @@ -99,6 +117,11 @@ public class MockCheckpointStorage implements CheckpointStorage { public CheckpointStateOutputStream createTaskOwnedStateStream() { return null; } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return null; + } }; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index 021e340..8f16b5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -24,10 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Set; /** {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO. */ @@ -86,4 +88,15 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { return blockingStream; } + + @Override + public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java index 3422850..8986a5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.sorted.state; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -73,4 +74,10 @@ class NonCheckpointingStorageAccess implements CheckpointStorageAccess { throw new UnsupportedOperationException( "Checkpoints are not supported in a single key state backend"); } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + throw new UnsupportedOperationException( + "Checkpoints are not supported in a single key state backend"); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index bcb4bdd..7c96d49 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -717,6 +718,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return delegate.createTaskOwnedStateStream(); } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return delegate.createTaskOwnedCheckpointStateToolset(); + } } private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java index ef7fc8a..81a1b78 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java @@ -24,11 +24,13 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; @@ -51,7 +53,9 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.time.Duration; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -224,7 +228,26 @@ public class UnalignedCheckpointFailureHandlingITCase { @Override public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) { - return ign -> new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose); + return new CheckpointStreamFactory() { + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream( + CheckpointedStateScope scope) throws IOException { + return new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose); + } + + @Override + public boolean canFastDuplicate( + StreamStateHandle stateHandle, CheckpointedStateScope scope) { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) + throws IOException { + throw new UnsupportedEncodingException(); + } + }; } @Override @@ -233,6 +256,11 @@ public class UnalignedCheckpointFailureHandlingITCase { } @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { + return delegate.createTaskOwnedCheckpointStateToolset(); + } + + @Override public boolean supportsHighlyAvailableStorage() { return delegate.supportsHighlyAvailableStorage(); }
