This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ec4acd2c99 [FLINK-27155][changelog] Reduce multiple reads to the same
Changelog file in the same taskmanager during restore
1ec4acd2c99 is described below
commit 1ec4acd2c993409092ddcb7121e2c9647bb4a086
Author: wangfeifan <[email protected]>
AuthorDate: Sun Apr 17 15:34:07 2022 +0800
[FLINK-27155][changelog] Reduce multiple reads to the same Changelog file
in the same taskmanager during restore
---
.../fs_state_changelog_configuration.html | 6 +
.../changelog/fs/ChangelogStreamHandleReader.java | 30 ++-
.../fs/ChangelogStreamHandleReaderWithCache.java | 221 ++++++++++++++++++
.../flink/changelog/fs/ChangelogStreamWrapper.java | 62 +++++
.../changelog/fs/FsStateChangelogOptions.java | 8 +
.../changelog/fs/FsStateChangelogStorage.java | 1 +
.../fs/FsStateChangelogStorageFactory.java | 5 +-
.../fs/FsStateChangelogStorageForRecovery.java | 17 +-
.../flink/changelog/fs/StateChangeFormat.java | 35 +--
...rRecovery.java => StateChangeIteratorImpl.java} | 30 +--
.../api/runtime/SavepointTaskStateManager.java | 10 +
.../TaskExecutorStateChangelogStoragesManager.java | 95 +++++++-
.../flink/runtime/state/TaskStateManager.java | 10 +
.../flink/runtime/state/TaskStateManagerImpl.java | 26 +++
.../changelog/StateChangelogStorageFactory.java | 2 +-
.../changelog/StateChangelogStorageLoader.java | 5 +-
.../InMemoryStateChangelogStorageFactory.java | 2 +-
.../flink/runtime/taskexecutor/TaskExecutor.java | 3 +-
...kExecutorStateChangelogStoragesManagerTest.java | 5 +-
.../runtime/state/TaskStateManagerImplTest.java | 4 +
.../flink/runtime/state/TestTaskStateManager.java | 21 ++
.../inmemory/StateChangelogStorageLoaderTest.java | 3 +-
.../runtime/util/JvmExitOnFatalErrorTest.java | 2 +
.../state/changelog/ChangelogStateBackend.java | 2 +
.../DeactivatedChangelogStateBackend.java | 3 +
.../restore/ChangelogBackendRestoreOperation.java | 17 +-
.../StateInitializationContextImplTest.java | 2 +
.../runtime/tasks/LocalStateForwardingTest.java | 2 +
.../streaming/runtime/tasks/StreamTaskTest.java | 3 +
.../test/state/ChangelogRecoveryCachingITCase.java | 253 +++++++++++++++++++++
30 files changed, 807 insertions(+), 78 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
index cc3b64dfcec..d5ca1bfadf1 100644
--- a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
@@ -38,6 +38,12 @@
<td>Integer</td>
<td>Number of threads to use to discard changelog (e.g.
pre-emptively uploaded unused state).</td>
</tr>
+ <tr>
+ <td><h5>dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
+ <td style="word-wrap: break-word;">10 min</td>
+ <td>Duration</td>
+ <td>Maximum idle time for cache files of distributed changelog
file, after which the cache files will be deleted.</td>
+ </tr>
<tr>
<td><h5>dstl.dfs.preemptive-persist-threshold</h5></td>
<td style="word-wrap: break-word;">5 mb</td>
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
similarity index 50%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
copy to
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
index f60ad065094..c633acf3bfd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
@@ -16,29 +16,25 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.state.changelog;
+package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import java.io.DataInputStream;
import java.io.IOException;
-/**
- * A factory for {@link StateChangelogStorage}. Please use {@link
StateChangelogStorageLoader} to
- * create {@link StateChangelogStorage}.
- */
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrapAndSeek;
+
+/** Changelog handle reader to use by {@link StateChangeIteratorImpl}. */
@Internal
-public interface StateChangelogStorageFactory {
- /** Get the identifier for user to use this changelog storage factory. */
- String getIdentifier();
+interface ChangelogStreamHandleReader extends AutoCloseable {
+
+ DataInputStream openAndSeek(StreamStateHandle handle, Long offset) throws
IOException;
- /** Create the storage based on a configuration. */
- StateChangelogStorage<?> createStorage(
- JobID jobID, Configuration configuration,
TaskManagerJobMetricGroup metricGroup)
- throws IOException;
+ @Override
+ default void close() throws Exception {}
- /** Create the storage for recovery. */
- StateChangelogStorageView<?> createStorageView() throws IOException;
+ ChangelogStreamHandleReader DIRECT_READER =
+ (handle, offset) -> wrapAndSeek(handle.openInputStream(), offset);
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
new file mode 100644
index 00000000000..1501cd7482f
--- /dev/null
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrap;
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrapAndSeek;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT;
+
+/** StateChangeIterator with local cache. */
+class ChangelogStreamHandleReaderWithCache implements
ChangelogStreamHandleReader {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(ChangelogStreamHandleReaderWithCache.class);
+
+ private static final String CACHE_FILE_SUB_DIR = "dstl-cache-file";
+ private static final String CACHE_FILE_PREFIX = "dstl";
+
+ // reference count == 1 means only cache component reference the cache file
+ private static final int NO_USING_REF_COUNT = 1;
+
+ private final File[] cacheDirectories;
+ private final AtomicInteger next;
+
+ private final ConcurrentHashMap<Path, RefCountedFile> cache = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService cacheCleanScheduler;
+ private final long cacheIdleMillis;
+
+ ChangelogStreamHandleReaderWithCache(Configuration config) {
+ this.cacheDirectories =
+ Arrays.stream(ConfigurationUtils.parseTempDirectories(config))
+ .map(path -> new File(path, CACHE_FILE_SUB_DIR))
+ .toArray(File[]::new);
+ Arrays.stream(this.cacheDirectories).forEach(File::mkdirs);
+
+ this.next = new AtomicInteger(new
Random().nextInt(this.cacheDirectories.length));
+
+ this.cacheCleanScheduler =
+ SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler",
LOG);
+ this.cacheIdleMillis = config.get(CACHE_IDLE_TIMEOUT).toMillis();
+ }
+
+ @Override
+ public DataInputStream openAndSeek(StreamStateHandle handle, Long offset)
throws IOException {
+ if (!canBeCached(handle)) {
+ return wrapAndSeek(handle.openInputStream(), offset);
+ }
+
+ final FileStateHandle fileHandle = (FileStateHandle) handle;
+ final RefCountedFile refCountedFile = getRefCountedFile(fileHandle);
+
+ FileInputStream fin = openAndSeek(refCountedFile, offset);
+
+ LOG.debug(
+ "return cached file {} (rc={}) for {} (offset={})",
+ refCountedFile.getFile(),
+ refCountedFile.getReferenceCounter(),
+ handle.getStreamStateHandleID(),
+ offset);
+ return wrapStream(fileHandle.getFilePath(), fin);
+ }
+
+ private boolean canBeCached(StreamStateHandle handle) throws IOException {
+ if (handle instanceof FileStateHandle) {
+ FileStateHandle fileHandle = (FileStateHandle) handle;
+ return fileHandle.getFilePath().getFileSystem().isDistributedFS();
+ } else {
+ return false;
+ }
+ }
+
+ private RefCountedFile getRefCountedFile(FileStateHandle fileHandle) {
+ return cache.compute(
+ fileHandle.getFilePath(),
+ (key, oldValue) -> {
+ if (oldValue == null) {
+ oldValue = downloadToCacheFile(fileHandle);
+ }
+ oldValue.retain();
+ return oldValue;
+ });
+ }
+
+ private RefCountedFile downloadToCacheFile(FileStateHandle fileHandle) {
+ File directory = cacheDirectories[next.getAndIncrement() %
cacheDirectories.length];
+ File file;
+ try {
+ file = File.createTempFile(CACHE_FILE_PREFIX, null, directory);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ return null;
+ }
+
+ try (InputStream in = wrap(fileHandle.openInputStream());
+ OutputStream out = new FileOutputStream(file)) {
+ LOG.debug(
+ "download and decompress dstl file : {} to cache file :
{}",
+ fileHandle.getFilePath(),
+ file.getPath());
+ IOUtils.copyBytes(in, out, false);
+
+ return new RefCountedFile(file);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ return null;
+ }
+ }
+
+ private FileInputStream openAndSeek(RefCountedFile refCountedFile, long
offset)
+ throws IOException {
+ FileInputStream fin = new FileInputStream(refCountedFile.getFile());
+ if (offset != 0) {
+ LOG.trace("seek to {}", offset);
+ fin.getChannel().position(offset);
+ }
+ return fin;
+ }
+
+ private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) {
+ return new DataInputStream(new BufferedInputStream(fin)) {
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ cache.computeIfPresent(
+ dfsPath,
+ (key, value) -> {
+ value.release();
+ if (value.getReferenceCounter() ==
NO_USING_REF_COUNT) {
+ cacheCleanScheduler.schedule(
+ () -> cleanCacheFile(dfsPath),
+ cacheIdleMillis,
+ TimeUnit.MILLISECONDS);
+ }
+ return value;
+ });
+ }
+ }
+ };
+ }
+
+ private void cleanCacheFile(Path dfsPath) {
+ cache.computeIfPresent(
+ dfsPath,
+ (key, value) -> {
+ if (value.getReferenceCounter() == NO_USING_REF_COUNT) {
+ LOG.debug(
+ "clean cached file : {} after {}ms idle",
+ value.getFile().getPath(),
+ cacheIdleMillis);
+ value.release();
+ // remove the cache file
+ return null;
+ } else {
+ return value;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ cacheCleanScheduler.shutdownNow();
+ if (!cacheCleanScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn(
+ "Unable to cleanly shutdown cache clean scheduler of "
+ + "ChangelogHandleReaderWithCache in 5s");
+ }
+
+ Iterator<RefCountedFile> iterator = cache.values().iterator();
+ while (iterator.hasNext()) {
+ RefCountedFile cacheFile = iterator.next();
+ iterator.remove();
+ LOG.debug("cleanup on close: {}", cacheFile.getFile().toPath());
+ Files.deleteIfExists(cacheFile.getFile().toPath());
+ }
+ }
+}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
new file mode 100644
index 00000000000..b4e724b8f41
--- /dev/null
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+class ChangelogStreamWrapper {
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangelogStreamWrapper.class);
+
+ static DataInputViewStreamWrapper wrap(InputStream stream) throws
IOException {
+ BufferedInputStream bufferedStream = new BufferedInputStream(stream);
+ boolean compressed = bufferedStream.read() == 1;
+ return new DataInputViewStreamWrapper(
+ compressed
+ ?
SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(
+ bufferedStream)
+ : bufferedStream) {
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ bufferedStream.close();
+ }
+ }
+ };
+ }
+
+ static DataInputViewStreamWrapper wrapAndSeek(InputStream stream, long
offset)
+ throws IOException {
+ DataInputViewStreamWrapper wrappedStream = wrap(stream);
+ if (offset != 0) {
+ LOG.debug("seek to {}", offset);
+ wrappedStream.skipBytesToRead((int) offset);
+ }
+ return wrappedStream;
+ }
+}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
index 25791f1cea2..4d05962f2ee 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
@@ -144,4 +144,12 @@ public class FsStateChangelogOptions {
.defaultValue(Duration.ofMillis(500))
.withDescription(
"Delay before the next attempt (if the failure was
not caused by a timeout).");
+
+ public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
+ ConfigOptions.key("dstl.dfs.download.local-cache.idle-timeout-ms")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "Maximum idle time for cache files of distributed
changelog file, "
+ + "after which the cache files will be
deleted.");
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 972135b7845..de7718521a0 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -112,6 +112,7 @@ public class FsStateChangelogStorage extends
FsStateChangelogStorageForRecovery
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes,
TaskChangelogRegistry changelogRegistry) {
+ super(ChangelogStreamHandleReader.DIRECT_READER);
this.preEmptivePersistThresholdInBytes =
preEmptivePersistThresholdInBytes;
this.changelogRegistry = changelogRegistry;
this.uploader = uploader;
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index 561a25de153..c81931241a5 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -53,8 +53,9 @@ public class FsStateChangelogStorageFactory implements
StateChangelogStorageFact
}
@Override
- public StateChangelogStorageView<?> createStorageView() {
- return new FsStateChangelogStorageForRecovery();
+ public StateChangelogStorageView<?> createStorageView(Configuration
configuration) {
+ return new FsStateChangelogStorageForRecovery(
+ new ChangelogStreamHandleReaderWithCache(configuration));
}
public static void configure(
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
index 377ac81e053..de40328abac 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
@@ -33,8 +33,23 @@ import javax.annotation.concurrent.ThreadSafe;
public class FsStateChangelogStorageForRecovery
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
+ private final ChangelogStreamHandleReader changelogStreamHandleReader;
+
+ public FsStateChangelogStorageForRecovery(
+ ChangelogStreamHandleReader changelogStreamHandleReader) {
+ this.changelogStreamHandleReader = changelogStreamHandleReader;
+ }
+
@Override
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl>
createReader() {
- return new StateChangelogHandleStreamHandleReader(new
StateChangeFormat());
+ return new StateChangelogHandleStreamHandleReader(
+ new StateChangeIteratorImpl(changelogStreamHandleReader));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (changelogStreamHandleReader != null) {
+ changelogStreamHandleReader.close();
+ }
}
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index ff2149301e4..7a672da4ba4 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -18,13 +18,8 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
-import
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
@@ -32,9 +27,8 @@ import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -48,8 +42,7 @@ import static java.util.Comparator.comparing;
/** Serialization format for state changes. */
@Internal
-public class StateChangeFormat
- implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
+public class StateChangeFormat {
private static final Logger LOG =
LoggerFactory.getLogger(StateChangeFormat.class);
Map<StateChangeSet, Long> write(OutputStreamWithPos os,
Collection<StateChangeSet> changeSets)
@@ -86,15 +79,8 @@ public class StateChangeFormat
}
}
- @Override
- public CloseableIterator<StateChange> read(StreamStateHandle handle, long
offset)
- throws IOException {
- FSDataInputStream stream = handle.openInputStream();
- DataInputViewStreamWrapper input = wrap(stream);
- if (offset != 0) {
- LOG.debug("seek from {} to {}", stream.getPos(), offset);
- input.skipBytesToRead((int) offset);
- }
+ CloseableIterator<StateChange> read(DataInputStream input) throws
IOException {
+
return new CloseableIterator<StateChange>() {
int numUnreadGroups = input.readInt();
int numLeftInGroup = numUnreadGroups-- == 0 ? 0 : input.readInt();
@@ -141,18 +127,9 @@ public class StateChangeFormat
@Override
public void close() throws Exception {
- LOG.trace("close {}", stream);
- stream.close();
+ LOG.trace("close {}", input);
+ input.close();
}
};
}
-
- private DataInputViewStreamWrapper wrap(InputStream stream) throws
IOException {
- stream = new BufferedInputStream(stream);
- boolean compressed = stream.read() == 1;
- return new DataInputViewStreamWrapper(
- compressed
- ?
SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(stream)
- : stream);
- }
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
similarity index 53%
copy from
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
copy to
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
index 377ac81e053..720cbcb75f3 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
@@ -18,23 +18,27 @@
package org.apache.flink.changelog.fs;
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
import
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
+import org.apache.flink.util.CloseableIterator;
-import javax.annotation.concurrent.ThreadSafe;
+import java.io.IOException;
-/** Filesystem-based implementation of {@link StateChangelogStorage} just for
recovery. */
-@Experimental
-@ThreadSafe
-public class FsStateChangelogStorageForRecovery
- implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
+/** StateChangeIterator default implementation. */
+class StateChangeIteratorImpl
+ implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
+
+ private final ChangelogStreamHandleReader changelogStreamHandleReader;
+
+ public StateChangeIteratorImpl(ChangelogStreamHandleReader
changelogStreamHandleReader) {
+ this.changelogStreamHandleReader = changelogStreamHandleReader;
+ }
@Override
- public StateChangelogHandleReader<ChangelogStateHandleStreamImpl>
createReader() {
- return new StateChangelogHandleStreamHandleReader(new
StateChangeFormat());
+ public CloseableIterator<StateChange> read(StreamStateHandle handle, long
offset)
+ throws IOException {
+ return new StateChangeFormat()
+ .read(changelogStreamHandleReader.openAndSeek(handle, offset));
}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index 7c4c592c4fd..cb6193e7724 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.state.api.runtime;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -27,7 +28,9 @@ import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -104,6 +107,13 @@ final class SavepointTaskStateManager implements
TaskStateManager {
return null;
}
+ @Nullable
+ @Override
+ public StateChangelogStorageView<?> getStateChangelogStorageView(
+ Configuration configuration, ChangelogStateHandle
changelogStateHandle) {
+ return null;
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) {
throw new UnsupportedOperationException(MSG);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index d5cf1711a93..59c3c090bf3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -22,8 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
@@ -55,6 +58,15 @@ public class TaskExecutorStateChangelogStoragesManager {
@GuardedBy("lock")
private final Map<JobID, Optional<StateChangelogStorage<?>>>
changelogStoragesByJobId;
+ /**
+ * This map holds all state changelog storage views of {@link
ChangelogStateHandleStreamImpl}
+ * for tasks running on the task manager / executor that own the instance
of this. Value type
+ * Optional is for containing the null value.
+ */
+ @GuardedBy("lock")
+ private final Map<JobID,
StateChangelogStorageView<ChangelogStateHandleStreamImpl>>
+ changelogStorageViewsByJobId;
+
@GuardedBy("lock")
private boolean closed;
@@ -65,6 +77,7 @@ public class TaskExecutorStateChangelogStoragesManager {
public TaskExecutorStateChangelogStoragesManager() {
this.changelogStoragesByJobId = new HashMap<>();
+ this.changelogStorageViewsByJobId = new HashMap<>();
this.closed = false;
// register a shutdown hook
@@ -120,7 +133,7 @@ public class TaskExecutorStateChangelogStoragesManager {
}
}
- public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
+ private void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
LOG.debug("Releasing state changelog storage under job id {}.", jobId);
Optional<StateChangelogStorage<?>> cleanupChangelogStorage;
synchronized (lock) {
@@ -135,28 +148,100 @@ public class TaskExecutorStateChangelogStoragesManager {
}
}
+ @Nullable
+ StateChangelogStorageView<?> stateChangelogStorageViewForJob(
+ @Nonnull JobID jobID,
+ Configuration configuration,
+ ChangelogStateHandle changelogStateHandle)
+ throws IOException {
+ if (closed) {
+ throw new IllegalStateException(
+ "TaskExecutorStateChangelogStoragesManager is already
closed and cannot "
+ + "register a new StateChangelogStorageView.");
+ }
+
+ // This implementation assume there is only one production
implementation of DSTL
+ // (FsStateChangelogStorage). Maybe we should change the type of
+ // changelogStorageViewsByJobId to map<jobId, map<dstl-identifier,
dstl>> when there is
+ // another implementation.
+
+ synchronized (lock) {
+ StateChangelogStorageView<ChangelogStateHandleStreamImpl>
storageView =
+ changelogStorageViewsByJobId.get(jobID);
+
+ if (storageView == null) {
+ StateChangelogStorageView<?> loaded =
+ StateChangelogStorageLoader.loadFromStateHandle(
+ configuration, changelogStateHandle);
+ storageView =
(StateChangelogStorageView<ChangelogStateHandleStreamImpl>) loaded;
+ changelogStorageViewsByJobId.put(jobID, storageView);
+
+ LOG.debug(
+ "Registered new state changelog storage view for job
{} : {}.",
+ jobID,
+ loaded);
+ } else {
+ LOG.debug(
+ "Found existing state changelog storage view for job
{}: {}.",
+ jobID,
+ storageView);
+ }
+
+ return storageView;
+ }
+ }
+
+ private void releaseStateChangelogStorageViewForJob(@Nonnull JobID jobID) {
+ LOG.debug("Releasing state changelog storage view under job id {}.",
jobID);
+ StateChangelogStorageView<ChangelogStateHandleStreamImpl>
cleanupStorageView;
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+ cleanupStorageView = changelogStorageViewsByJobId.remove(jobID);
+ }
+
+ if (cleanupStorageView != null) {
+ doRelease(cleanupStorageView);
+ }
+ }
+
+ public void releaseResourcesForJob(@Nonnull JobID jobID) {
+ releaseStateChangelogStorageForJob(jobID);
+ releaseStateChangelogStorageViewForJob(jobID);
+ }
+
public void shutdown() {
- HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease;
+ HashMap<JobID, Optional<StateChangelogStorage<?>>> toReleaseStorage;
+ HashMap<JobID,
StateChangelogStorageView<ChangelogStateHandleStreamImpl>>
+ toReleaseStorageView;
synchronized (lock) {
if (closed) {
return;
}
closed = true;
- toRelease = new HashMap<>(changelogStoragesByJobId);
+ toReleaseStorage = new HashMap<>(changelogStoragesByJobId);
+ toReleaseStorageView = new HashMap<>(changelogStorageViewsByJobId);
changelogStoragesByJobId.clear();
+ changelogStorageViewsByJobId.clear();
}
ShutdownHookUtil.removeShutdownHook(shutdownHook,
getClass().getSimpleName(), LOG);
LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
- for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry :
toRelease.entrySet()) {
+ for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry :
+ toReleaseStorage.entrySet()) {
entry.getValue().ifPresent(this::doRelease);
}
+ for (Map.Entry<JobID,
StateChangelogStorageView<ChangelogStateHandleStreamImpl>> entry :
+ toReleaseStorageView.entrySet()) {
+ doRelease(entry.getValue());
+ }
}
- private void doRelease(StateChangelogStorage<?> storage) {
+ private void doRelease(StateChangelogStorageView<?> storage) {
if (storage != null) {
try {
storage.close();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 60041104923..7c979b6b0d8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -26,7 +27,9 @@ import
org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -101,4 +104,11 @@ public interface TaskStateManager extends
CheckpointListener, AutoCloseable {
/** Returns the configured state changelog storage for this task. */
@Nullable
StateChangelogStorage<?> getStateChangelogStorage();
+
+ /**
+ * Returns the state changelog storage view of given {@link
ChangelogStateHandle} for this task.
+ */
+ @Nullable
+ StateChangelogStorageView<?> getStateChangelogStorageView(
+ Configuration configuration, ChangelogStateHandle
changelogStateHandle);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 2d14d37e01a..a82b55b155b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -30,8 +31,11 @@ import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -75,6 +80,8 @@ public class TaskStateManagerImpl implements TaskStateManager
{
/** The changelog storage where the manager reads and writes the changelog
*/
@Nullable private final StateChangelogStorage<?> stateChangelogStorage;
+ private final TaskExecutorStateChangelogStoragesManager
changelogStoragesManager;
+
/** The checkpoint responder through which this manager can report to the
job manager. */
private final CheckpointResponder checkpointResponder;
@@ -85,6 +92,7 @@ public class TaskStateManagerImpl implements TaskStateManager
{
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
+ @Nonnull TaskExecutorStateChangelogStoragesManager
changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
@Nonnull CheckpointResponder checkpointResponder) {
this(
@@ -92,6 +100,7 @@ public class TaskStateManagerImpl implements
TaskStateManager {
executionAttemptID,
localStateStore,
stateChangelogStorage,
+ changelogStoragesManager,
jobManagerTaskRestore,
checkpointResponder,
new SequentialChannelStateReaderImpl(
@@ -105,12 +114,14 @@ public class TaskStateManagerImpl implements
TaskStateManager {
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
+ @Nonnull TaskExecutorStateChangelogStoragesManager
changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
@Nonnull CheckpointResponder checkpointResponder,
@Nonnull SequentialChannelStateReaderImpl
sequentialChannelStateReader) {
this.jobId = jobId;
this.localStateStore = localStateStore;
this.stateChangelogStorage = stateChangelogStorage;
+ this.changelogStoragesManager = changelogStoragesManager;
this.jobManagerTaskRestore = jobManagerTaskRestore;
this.executionAttemptID = executionAttemptID;
this.checkpointResponder = checkpointResponder;
@@ -244,6 +255,21 @@ public class TaskStateManagerImpl implements
TaskStateManager {
return stateChangelogStorage;
}
+ @Nullable
+ @Override
+ public StateChangelogStorageView<?> getStateChangelogStorageView(
+ Configuration configuration, ChangelogStateHandle
changelogStateHandle) {
+ StateChangelogStorageView<?> storageView = null;
+ try {
+ storageView =
+ changelogStoragesManager.stateChangelogStorageViewForJob(
+ jobId, configuration, changelogStateHandle);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ return storageView;
+ }
+
/** Tracking when local state can be confirmed and disposed. */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index f60ad065094..be86a416e6c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -40,5 +40,5 @@ public interface StateChangelogStorageFactory {
throws IOException;
/** Create the storage for recovery. */
- StateChangelogStorageView<?> createStorageView() throws IOException;
+ StateChangelogStorageView<?> createStorageView(Configuration
configuration) throws IOException;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 7947e83e145..622c09f9956 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -106,7 +106,8 @@ public class StateChangelogStorageLoader {
@Nonnull
public static StateChangelogStorageView<?> loadFromStateHandle(
- ChangelogStateHandle changelogStateHandle) throws IOException {
+ Configuration configuration, ChangelogStateHandle
changelogStateHandle)
+ throws IOException {
StateChangelogStorageFactory factory =
STATE_CHANGELOG_STORAGE_FACTORIES.get(changelogStateHandle.getStorageIdentifier());
if (factory == null) {
@@ -120,7 +121,7 @@ public class StateChangelogStorageLoader {
"Creating a changelog storage with name '{}' to restore
from '{}'.",
changelogStateHandle.getStorageIdentifier(),
changelogStateHandle.getClass().getSimpleName());
- return factory.createStorageView();
+ return factory.createStorageView(configuration);
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index d09d638d242..786a6a7be29 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -41,7 +41,7 @@ public class InMemoryStateChangelogStorageFactory implements
StateChangelogStora
}
@Override
- public StateChangelogStorageView<?> createStorageView() {
+ public StateChangelogStorageView<?> createStorageView(Configuration
configuration) {
return new InMemoryStateChangelogStorage();
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3b04d8a7a40..26b274f5a5c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -713,6 +713,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
tdd.getExecutionAttemptId(),
localStateStore,
changelogStorage,
+ changelogStoragesManager,
taskRestore,
checkpointResponder);
@@ -1808,7 +1809,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
closeJob(job, cause);
});
taskManagerMetricGroup.removeJobMetricsGroup(jobId);
- changelogStoragesManager.releaseStateChangelogStorageForJob(jobId);
+ changelogStoragesManager.releaseResourcesForJob(jobId);
currentSlotOfferPerJob.remove(jobId);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index cf6ef1deb3e..b0efaa9c427 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -82,7 +82,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
jobId1, configuration,
createUnregisteredTaskManagerJobMetricGroup());
Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
- manager.releaseStateChangelogStorageForJob(jobId1);
+ manager.releaseResourcesForJob(jobId1);
Assert.assertTrue(((TestStateChangelogStorage) storage1).closed);
StateChangelogStorage<?> storage2 =
@@ -212,7 +212,8 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
}
@Override
- public StateChangelogStorageView<?> createStorageView() throws
IOException {
+ public StateChangelogStorageView<?> createStorageView(Configuration
configuration)
+ throws IOException {
return new TestStateChangelogStorage();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 5eb2664b5d3..9b3f6f17342 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -282,6 +282,7 @@ public class TaskStateManagerImplTest extends TestLogger {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
new TestCheckpointResponder());
Assert.assertTrue(stateManager.isTaskDeployedAsFinished());
@@ -294,6 +295,7 @@ public class TaskStateManagerImplTest extends TestLogger {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ new TaskExecutorStateChangelogStoragesManager(),
null,
new TestCheckpointResponder());
Assert.assertFalse(emptyStateManager.getRestoreCheckpointId().isPresent());
@@ -304,6 +306,7 @@ public class TaskStateManagerImplTest extends TestLogger {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ new TaskExecutorStateChangelogStoragesManager(),
new JobManagerTaskRestore(2, new TaskStateSnapshot()),
new TestCheckpointResponder());
Assert.assertEquals(2L, (long)
nonEmptyStateManager.getRestoreCheckpointId().get());
@@ -322,6 +325,7 @@ public class TaskStateManagerImplTest extends TestLogger {
executionAttemptID,
localStateStore,
stateChangelogStorage,
+ new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
checkpointResponderMock);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 64b3da7a327..a98d0fd7b06 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -29,14 +30,19 @@ import
org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -223,6 +229,21 @@ public class TestTaskStateManager implements
TaskStateManager {
return stateChangelogStorage;
}
+ @org.jetbrains.annotations.Nullable
+ @Override
+ public StateChangelogStorageView<?> getStateChangelogStorageView(
+ Configuration configuration, ChangelogStateHandle
changelogStateHandle) {
+ StateChangelogStorageView<?> storageView = null;
+ try {
+ storageView =
+ StateChangelogStorageLoader.loadFromStateHandle(
+ configuration, changelogStateHandle);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ return storageView;
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
this.notifiedCompletedCheckpointId = checkpointId;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 988a707242c..6c091eeb546 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -125,7 +125,8 @@ public class StateChangelogStorageLoaderTest {
}
@Override
- public StateChangelogStorageView<?> createStorageView() throws
IOException {
+ public StateChangelogStorageView<?> createStorageView(Configuration
configuration)
+ throws IOException {
return new TestStateChangelogStorage();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 2e142af9593..d86e2219af5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
@@ -219,6 +220,7 @@ public class JvmExitOnFatalErrorTest extends TestLogger {
executionAttemptID,
localStateStore,
changelogStorage,
+ new
TaskExecutorStateChangelogStoragesManager(),
null,
mock(CheckpointResponder.class));
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index b73f7f7a5a1..b186cd4bedb 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -90,7 +90,9 @@ public class ChangelogStateBackend extends
AbstractChangelogStateBackend
ChangelogStateFactory changelogStateFactory = new
ChangelogStateFactory();
CheckpointableKeyedStateBackend<K> keyedStateBackend =
ChangelogBackendRestoreOperation.restore(
+ env.getTaskManagerInfo().getConfiguration(),
env.getUserCodeClassLoader().asClassLoader(),
+ env.getTaskStateManager(),
stateBackendHandles,
baseBackendBuilder,
(baseBackend, baseState) ->
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
index d8f1ec70e47..88808064f9e 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
@@ -66,8 +66,11 @@ public class DeactivatedChangelogStateBackend extends
AbstractChangelogStateBack
// So we need to rebound the checkpoint id to the real checkpoint id
here.
stateBackendHandles = reboundCheckpoint(stateBackendHandles);
ChangelogStateFactory changelogStateFactory = new
ChangelogStateFactory();
+
return ChangelogBackendRestoreOperation.restore(
+ env.getTaskManagerInfo().getConfiguration(),
env.getUserCodeClassLoader().asClassLoader(),
+ env.getTaskStateManager(),
stateBackendHandles,
baseBackendBuilder,
(baseBackend, baseState) ->
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
index 832ca0fff40..ef237019678 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -18,14 +18,15 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
@@ -60,7 +61,9 @@ public class ChangelogBackendRestoreOperation {
Exception> {}
public static <K> CheckpointableKeyedStateBackend<K> restore(
+ Configuration configuration,
ClassLoader classLoader,
+ TaskStateManager taskStateManager,
Collection<ChangelogStateBackendHandle> stateHandles,
BaseBackendBuilder<K> baseBackendBuilder,
ChangelogRestoreTargetBuilder<K> changelogRestoreTargetBuilder)
@@ -72,7 +75,12 @@ public class ChangelogBackendRestoreOperation {
for (ChangelogStateBackendHandle handle : stateHandles) {
if (handle != null) { // null is empty state (no change)
- readBackendHandle(changelogRestoreTarget, handle, classLoader);
+ readBackendHandle(
+ configuration,
+ taskStateManager,
+ changelogRestoreTarget,
+ handle,
+ classLoader);
}
}
return changelogRestoreTarget.getRestoredKeyedStateBackend();
@@ -80,6 +88,8 @@ public class ChangelogBackendRestoreOperation {
@SuppressWarnings("unchecked")
private static <T extends ChangelogStateHandle> void readBackendHandle(
+ Configuration configuration,
+ TaskStateManager taskStateManager,
ChangelogRestoreTarget<?> changelogRestoreTarget,
ChangelogStateBackendHandle backendHandle,
ClassLoader classLoader)
@@ -89,7 +99,8 @@ public class ChangelogBackendRestoreOperation {
backendHandle.getNonMaterializedStateHandles()) {
StateChangelogHandleReader<T> changelogHandleReader =
(StateChangelogHandleReader<T>)
-
StateChangelogStorageLoader.loadFromStateHandle(changelogHandle)
+ taskStateManager
+
.getStateChangelogStorageView(configuration, changelogHandle)
.createReader();
try (CloseableIterator<StateChange> changes =
changelogHandleReader.getChanges((T) changelogHandle)) {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index bacc3a6cb06..916caba04b1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
@@ -169,6 +170,7 @@ public class StateInitializationContextImplTest {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
new InMemoryStateChangelogStorage(),
+ new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
mock(CheckpointResponder.class));
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 318256fea7f..c7f13aad326 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
+import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -251,6 +252,7 @@ public class LocalStateForwardingTest extends TestLogger {
executionAttemptID,
taskLocalStateStore,
stateChangelogStorage,
+ new TaskExecutorStateChangelogStoragesManager(),
null,
checkpointResponder);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 782344dfe20..8777de72fcc 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
+import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -789,6 +790,7 @@ public class StreamTaskTest extends TestLogger {
createExecutionAttemptId(),
mock(TaskLocalStateStoreImpl.class),
new InMemoryStateChangelogStorage(),
+ new TaskExecutorStateChangelogStoragesManager(),
null,
checkpointResponder);
@@ -982,6 +984,7 @@ public class StreamTaskTest extends TestLogger {
createExecutionAttemptId(),
mock(TaskLocalStateStoreImpl.class),
new InMemoryStateChangelogStorage(),
+ new TaskExecutorStateChangelogStoragesManager(),
null,
checkpointResponder);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
new file mode 100644
index 00000000000..01f6353473c
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static java.util.Collections.singletonMap;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD;
+import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
+import static
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
+import static
org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG;
+import static
org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL;
+import static
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
+import static
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
+import static
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Tests caching of changelog segments downloaded during recovery. */
+public class ChangelogRecoveryCachingITCase extends TestLogger {
+ private static final int ACCUMULATE_TIME_MILLIS = 500; // high enough to
build some state
+ private static final int PARALLELISM = 10; // high enough to trigger DSTL
file multiplexing
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private OpenOnceFileSystem fileSystem;
+
+ private MiniClusterWithClientResource cluster;
+
+ @Before
+ public void before() throws Exception {
+ File tmpFolder = temporaryFolder.newFolder();
+ registerFileSystem(fileSystem = new OpenOnceFileSystem(),
tmpFolder.toURI().getScheme());
+
+ Configuration configuration = new Configuration();
+ configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache
forever
+
+ FsStateChangelogStorageFactory.configure(
+ configuration, tmpFolder, Duration.ofMinutes(1), 10);
+ cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+ cluster.before();
+ }
+
+ @After
+ public void after() throws Exception {
+ if (cluster != null) {
+ cluster.after();
+ cluster = null;
+ }
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Test
+ public void test() throws Exception {
+ JobID jobID1 = submit(configureJob(temporaryFolder.newFolder()), graph
-> {});
+
+ Thread.sleep(ACCUMULATE_TIME_MILLIS);
+ String cpLocation = checkpointAndCancel(jobID1);
+
+ JobID jobID2 =
+ submit(
+ configureJob(temporaryFolder.newFolder()),
+ graph ->
graph.setSavepointRestoreSettings(forPath(cpLocation)));
+ waitForAllTaskRunning(cluster.getMiniCluster(), jobID2, true);
+ cluster.getClusterClient().cancel(jobID2).get();
+
+ checkState(fileSystem.hasOpenedPaths());
+ }
+
+ private JobID submit(Configuration conf, Consumer<JobGraph> updateGraph)
+ throws InterruptedException, ExecutionException {
+ JobGraph jobGraph = createJobGraph(conf);
+ updateGraph.accept(jobGraph);
+ return cluster.getClusterClient().submitJob(jobGraph).get();
+ }
+
+ private JobGraph createJobGraph(Configuration conf) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
+ .keyBy(num -> num % 1000)
+ .map(
+ new RichMapFunction<Long, Long>() {
+ @Override
+ public Long map(Long value) throws Exception {
+ getRuntimeContext()
+ .getState(new
ValueStateDescriptor<>("state", Long.class))
+ .update(value);
+ return value;
+ }
+ })
+ .addSink(new DiscardingSink<>());
+
+ return env.getStreamGraph().getJobGraph();
+ }
+
+ private Configuration configureJob(File cpDir) {
+ Configuration conf = new Configuration();
+
+ conf.set(EXTERNALIZED_CHECKPOINT, RETAIN_ON_CANCELLATION);
+ conf.set(DEFAULT_PARALLELISM, PARALLELISM);
+ conf.set(ENABLE_STATE_CHANGE_LOG, true);
+ conf.set(CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+ conf.set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10));
+ conf.set(CHECKPOINT_STORAGE, "filesystem");
+ conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString());
+ conf.set(STATE_BACKEND, "hashmap");
+ conf.set(LOCAL_RECOVERY, false); // force download
+ // tune changelog
+ conf.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10));
+ conf.set(PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofDays(365));
+
+ conf.set(ENABLE_UNALIGNED, true); // speedup
+ conf.set(ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO); // prevent
randomization
+ conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization
+ conf.set(RESTART_STRATEGY, "none"); // not expecting any failures
+
+ return conf;
+ }
+
+ private String checkpointAndCancel(JobID jobID) throws Exception {
+ waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
+ cluster.getClusterClient().cancel(jobID).get();
+ checkStatus(jobID);
+ return CommonTestUtils.getLatestCompletedCheckpointPath(jobID,
cluster.getMiniCluster())
+ .<NoSuchElementException>orElseThrow(
+ () -> {
+ throw new NoSuchElementException("No checkpoint
was created yet");
+ });
+ }
+
+ private void checkStatus(JobID jobID) throws InterruptedException,
ExecutionException {
+ if
(cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState())
{
+ cluster.getClusterClient()
+ .requestJobResult(jobID)
+ .get()
+ .getSerializedThrowable()
+ .ifPresent(
+ serializedThrowable -> {
+ throw new
RuntimeException(serializedThrowable);
+ });
+ }
+ }
+
+ private static class OpenOnceFileSystem extends LocalFileSystem {
+ private final Set<Path> openedPaths = new HashSet<>();
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ Assert.assertTrue(f + " was already opened", openedPaths.add(f));
+ return super.open(f);
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return true;
+ }
+
+ private boolean hasOpenedPaths() {
+ return !openedPaths.isEmpty();
+ }
+ }
+
+ private static void registerFileSystem(FileSystem fs, String scheme) {
+ FileSystem.initialize(
+ new Configuration(),
+ new TestingPluginManager(
+ singletonMap(
+ FileSystemFactory.class,
+ Collections.singleton(
+ new FileSystemFactory() {
+ @Override
+ public FileSystem
create(URI fsUri) {
+ return fs;
+ }
+
+ @Override
+ public String getScheme() {
+ return scheme;
+ }
+ })
+ .iterator())));
+ }
+}