This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f9632a07199854c0225bd7f416c038fbf59abe0 Author: fredia <[email protected]> AuthorDate: Tue May 31 15:22:59 2022 +0800 [FLINK-27693][changelog] Support local recovery for non-materialized part --- .../fs/AbstractStateChangeFsUploader.java | 120 ++++++++++++++ .../fs/DuplicatingOutputStreamWithPos.java | 174 +++++++++++++++++++++ .../fs/DuplicatingStateChangeFsUploader.java | 111 +++++++++++++ .../changelog/fs/FsStateChangelogStorage.java | 50 ++++-- .../fs/FsStateChangelogStorageFactory.java | 8 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++++++++++--- .../flink/changelog/fs/OutputStreamWithPos.java | 54 ++++++- .../flink/changelog/fs/StateChangeFormat.java | 10 +- .../flink/changelog/fs/StateChangeFsUploader.java | 104 ++---------- .../changelog/fs/StateChangeUploadScheduler.java | 34 ++-- .../flink/changelog/fs/StateChangeUploader.java | 33 +++- .../apache/flink/changelog/fs/UploadResult.java | 41 ++++- .../fs/BatchingStateChangeUploadSchedulerTest.java | 6 +- .../changelog/fs/ChangelogStorageMetricsTest.java | 45 ++++-- .../changelog/fs/FsStateChangelogStorageTest.java | 7 +- .../fs/FsStateChangelogWriterSqnTest.java | 6 +- .../changelog/fs/FsStateChangelogWriterTest.java | 25 ++- .../state/ChangelogTaskLocalStateStore.java | 33 +++- .../TaskExecutorStateChangelogStoragesManager.java | 6 +- .../state/changelog/LocalChangelogRegistry.java | 64 ++++++++ .../changelog/LocalChangelogRegistryImpl.java | 118 ++++++++++++++ .../changelog/StateChangelogStorageFactory.java | 6 +- .../changelog/StateChangelogStorageLoader.java | 8 +- .../state/changelog/StateChangelogWriter.java | 21 ++- .../InMemoryStateChangelogStorageFactory.java | 6 +- .../inmemory/InMemoryStateChangelogWriter.java | 15 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 5 +- .../state/ChangelogTaskLocalStateStoreTest.java | 2 + ...kExecutorStateChangelogStoragesManagerTest.java | 60 +++++-- .../changelog/LocalChangelogRegistryTest.java | 56 +++++++ .../inmemory/StateChangelogStorageLoaderTest.java | 16 +- .../inmemory/StateChangelogStorageTest.java | 7 +- ...cutorExecutionDeploymentReconciliationTest.java | 14 +- .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 14 ++ .../changelog/ChangelogKeyedStateBackend.java | 28 +++- .../state/changelog/ChangelogTruncateHelper.java | 1 + .../changelog/ChangelogStateBackendTestUtils.java | 4 +- .../state/changelog/ChangelogStateDiscardTest.java | 20 ++- .../state/changelog/StateChangeLoggerTestBase.java | 7 +- .../ChangelogLocalRecoveryITCase.java | 6 +- 40 files changed, 1219 insertions(+), 237 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java new file mode 100644 index 00000000000..3dd3dfcc70f --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** Base implementation of StateChangeUploader. */ +public abstract class AbstractStateChangeFsUploader implements StateChangeUploader { + + private final StateChangeFormat format; + private final Clock clock; + private final TaskChangelogRegistry changelogRegistry; + private final BiFunction<Path, Long, StreamStateHandle> handleFactory; + protected final ChangelogStorageMetricGroup metrics; + protected final boolean compression; + protected final int bufferSize; + + public AbstractStateChangeFsUploader( + boolean compression, + int bufferSize, + ChangelogStorageMetricGroup metrics, + TaskChangelogRegistry changelogRegistry, + BiFunction<Path, Long, StreamStateHandle> handleFactory) { + this.format = new StateChangeFormat(); + this.compression = compression; + this.bufferSize = bufferSize; + this.metrics = metrics; + this.clock = SystemClock.getInstance(); + this.changelogRegistry = changelogRegistry; + this.handleFactory = handleFactory; + } + + abstract OutputStreamWithPos prepareStream() throws IOException; + + @Override + public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { + metrics.getUploadsCounter().inc(); + long start = clock.relativeTimeNanos(); + UploadTasksResult result = uploadInternal(tasks); + metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start); + metrics.getUploadSizes().update(result.getStateSize()); + return result; + } + + private UploadTasksResult uploadInternal(Collection<UploadTask> tasks) throws IOException { + try (OutputStreamWithPos stream = prepareStream()) { + final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets = + new HashMap<>(); + for (UploadTask task : tasks) { + tasksOffsets.put(task, format.write(stream, task.changeSets)); + } + StreamStateHandle handle = stream.getHandle(handleFactory); + changelogRegistry.startTracking( + handle, + tasks.stream() + .flatMap(t -> t.getChangeSets().stream()) + .map(StateChangeSet::getLogId) + .collect(Collectors.toSet())); + if (stream instanceof DuplicatingOutputStreamWithPos) { + StreamStateHandle localHandle = + ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory); + changelogRegistry.startTracking( + localHandle, + tasks.stream() + .flatMap(t -> t.getChangeSets().stream()) + .map(StateChangeSet::getLogId) + .collect(Collectors.toSet())); + return new UploadTasksResult(tasksOffsets, handle, localHandle); + } + // WARN: streams have to be closed before returning the results + // otherwise JM may receive invalid handles + return new UploadTasksResult(tasksOffsets, handle); + } catch (IOException e) { + metrics.getUploadFailuresCounter().inc(); + try (Closer closer = Closer.create()) { + closer.register( + () -> { + throw e; + }); + tasks.forEach(cs -> closer.register(() -> cs.fail(e))); + } + } + return null; // closer above throws an exception + } + + protected String generateFileName() { + return UUID.randomUUID().toString(); + } +} diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java new file mode 100644 index 00000000000..b977bd3de0d --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.function.BiFunction; + +/** + * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which + * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The + * difference is that this stream does not delete the file when {@link #close()}. + */ +class DuplicatingOutputStreamWithPos extends OutputStreamWithPos { + private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class); + + private OutputStream secondaryStream; + private OutputStream originalSecondaryStream; + private final Path secondaryPath; + + /** + * Stores a potential exception that occurred while interacting with {@link #secondaryStream}. + */ + private Exception secondaryStreamException; + + public DuplicatingOutputStreamWithPos( + OutputStream primaryStream, + Path primaryPath, + OutputStream secondaryStream, + Path secondaryPath) { + super(primaryStream, primaryPath); + this.secondaryStream = Preconditions.checkNotNull(secondaryStream); + this.originalSecondaryStream = Preconditions.checkNotNull(secondaryStream); + this.secondaryPath = Preconditions.checkNotNull(secondaryPath); + } + + @Override + public void wrap(boolean compression, int bufferSize) throws IOException { + super.wrap(compression, bufferSize); + this.secondaryStream = wrapInternal(compression, bufferSize, this.originalSecondaryStream); + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + if (secondaryStreamException == null) { + try { + secondaryStream.write(b); + } catch (Exception ex) { + handleSecondaryStreamOnException(ex); + } + } + pos++; + } + + @Override + public void write(byte[] b) throws IOException { + outputStream.write(b); + if (secondaryStreamException == null) { + try { + secondaryStream.write(b); + } catch (Exception ex) { + LOG.warn("Exception encountered during write to secondary stream"); + handleSecondaryStreamOnException(ex); + } + } + pos += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + if (secondaryStreamException == null) { + try { + secondaryStream.write(b, off, len); + } catch (Exception ex) { + LOG.warn("Exception encountered during writing to secondary stream"); + handleSecondaryStreamOnException(ex); + } + } + pos += len; + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + if (secondaryStreamException == null) { + try { + secondaryStream.flush(); + } catch (Exception ex) { + LOG.warn("Exception encountered during flushing secondary stream"); + handleSecondaryStreamOnException(ex); + } + } + } + + @Override + public void close() throws IOException { + Exception exCollector = null; + + try { + super.close(); + } catch (Exception closeEx) { + exCollector = ExceptionUtils.firstOrSuppressed(closeEx, exCollector); + } + + if (secondaryStreamException == null) { + try { + secondaryStream.close(); + originalSecondaryStream.close(); + } catch (Exception closeEx) { + getSecondaryPath().getFileSystem().delete(getSecondaryPath(), true); + handleSecondaryStreamOnException(closeEx); + } + } + + if (exCollector != null) { + throw new IOException("Exception while closing duplicating stream.", exCollector); + } + } + + private void handleSecondaryStreamOnException(Exception ex) { + + Preconditions.checkState( + secondaryStreamException == null, + "Secondary stream already failed from previous exception!"); + try { + secondaryStream.close(); + } catch (Exception closeEx) { + ex = ExceptionUtils.firstOrSuppressed(closeEx, ex); + } + + secondaryStreamException = Preconditions.checkNotNull(ex); + } + + public Path getSecondaryPath() { + return secondaryPath; + } + + public StreamStateHandle getSecondaryHandle( + BiFunction<Path, Long, StreamStateHandle> handleFactory) throws IOException { + if (secondaryStreamException == null) { + return handleFactory.apply(secondaryPath, this.pos); + } else { + throw new IOException( + "Secondary stream previously failed exceptionally", secondaryStreamException); + } + } +} diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java new file mode 100644 index 00000000000..cf99ed17683 --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.flink.changelog.fs.StateChangeFsUploader.PATH_SUB_DIR; +import static org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLocalTaskOwnedDirectory; + +/** + * A StateChangeFsUploader implementation that writes the changes to remote and local. + * <li>Local dstl files are only managed by TM side, {@link LocalChangelogRegistry}, {@link + * TaskChangelogRegistry} and {@link ChangelogTaskLocalStateStore} are responsible for managing + * them. + * <li>Remote dstl files are managed by TM side and JM side, {@link TaskChangelogRegistry} is + * responsible for TM side, and {@link SharedStateRegistry} is responsible for JM side. + * + * <p>The total discard logic of local dstl files is: + * + * <ol> + * <li>Register files to {@link TaskChangelogRegistry#startTracking} on {@link #upload}. + * <li>Store the meta of files into {@link ChangelogTaskLocalStateStore} by + * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). + * <li>Pass control of the file to {@link LocalChangelogRegistry#register} when + * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous + * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at + * the same time. + * <li>When ChangelogTruncateHelper#materialized() or + * ChangelogTruncateHelper#checkpointSubsumed() is called, {@link + * TaskChangelogRegistry#notUsed} is responsible for deleting local files. + * <li>When one checkpoint is aborted, the dstl files of this checkpoint will be deleted by + * {@link LocalChangelogRegistry#prune} in {@link FsStateChangelogWriter#reset}. + * </ol> + */ +public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader { + + private static final Logger LOG = + LoggerFactory.getLogger(DuplicatingStateChangeFsUploader.class); + + private final Path basePath; + private final FileSystem fileSystem; + private final LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider; + private final JobID jobID; + + public DuplicatingStateChangeFsUploader( + JobID jobID, + Path basePath, + FileSystem fileSystem, + boolean compression, + int bufferSize, + ChangelogStorageMetricGroup metrics, + TaskChangelogRegistry changelogRegistry, + LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) { + super(compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new); + this.basePath = + new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR)); + this.fileSystem = fileSystem; + this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider; + this.jobID = jobID; + } + + @Override + public OutputStreamWithPos prepareStream() throws IOException { + final String fileName = generateFileName(); + LOG.debug("upload tasks to {}", fileName); + Path path = new Path(basePath, fileName); + FSDataOutputStream primaryStream = fileSystem.create(path, WriteMode.NO_OVERWRITE); + Path localPath = + new Path( + getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID), + fileName); + FSDataOutputStream secondaryStream = + localPath.getFileSystem().create(localPath, WriteMode.NO_OVERWRITE); + DuplicatingOutputStreamWithPos outputStream = + new DuplicatingOutputStreamWithPos(primaryStream, path, secondaryStream, localPath); + outputStream.wrap(this.compression, this.bufferSize); + return outputStream; + } + + @Override + public void close() throws Exception {} +} diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java index de7718521a0..5ba18f701ea 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java @@ -26,16 +26,22 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_DISCARD_THREADS; @@ -62,26 +68,42 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery private final TaskChangelogRegistry changelogRegistry; + @Nullable private LocalChangelogRegistry localChangelogRegistry = LocalChangelogRegistry.NO_OP; + + /** The configuration for local recovery. */ + @Nonnull private final LocalRecoveryConfig localRecoveryConfig; + public FsStateChangelogStorage( - JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup) + JobID jobID, + Configuration config, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) throws IOException { - this(jobID, config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS))); + this( + jobID, + config, + metricGroup, + defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)), + localRecoveryConfig); } public FsStateChangelogStorage( JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup, - TaskChangelogRegistry changelogRegistry) + TaskChangelogRegistry changelogRegistry, + LocalRecoveryConfig localRecoveryConfig) throws IOException { this( fromConfig( jobID, config, new ChangelogStorageMetricGroup(metricGroup), - changelogRegistry), + changelogRegistry, + localRecoveryConfig), config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes(), - changelogRegistry); + changelogRegistry, + localRecoveryConfig); } @VisibleForTesting @@ -91,7 +113,8 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery boolean compression, int bufferSize, ChangelogStorageMetricGroup metricGroup, - TaskChangelogRegistry changelogRegistry) + TaskChangelogRegistry changelogRegistry, + LocalRecoveryConfig localRecoveryConfig) throws IOException { this( directScheduler( @@ -104,18 +127,25 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery metricGroup, changelogRegistry)), PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes(), - changelogRegistry); + changelogRegistry, + localRecoveryConfig); } @VisibleForTesting public FsStateChangelogStorage( StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, - TaskChangelogRegistry changelogRegistry) { + TaskChangelogRegistry changelogRegistry, + LocalRecoveryConfig localRecoveryConfig) { super(ChangelogStreamHandleReader.DIRECT_READER); this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes; this.changelogRegistry = changelogRegistry; this.uploader = uploader; + this.localRecoveryConfig = localRecoveryConfig; + if (localRecoveryConfig.isLocalRecoveryEnabled()) { + this.localChangelogRegistry = + new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor()); + } } @Override @@ -129,7 +159,9 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery uploader, preEmptivePersistThresholdInBytes, mailboxExecutor, - changelogRegistry); + changelogRegistry, + localRecoveryConfig, + localChangelogRegistry); } @Override diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java index c81931241a5..edb412e3ea4 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory; import org.apache.flink.runtime.state.changelog.StateChangelogStorageView; @@ -47,9 +48,12 @@ public class FsStateChangelogStorageFactory implements StateChangelogStorageFact @Override public StateChangelogStorage<?> createStorage( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) throws IOException { - return new FsStateChangelogStorage(jobID, configuration, metricGroup); + return new FsStateChangelogStorage(jobID, configuration, metricGroup, localRecoveryConfig); } @Override diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index e6b5b583885..2481fdcc2e4 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -22,8 +22,11 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.SequenceNumberRange; import org.apache.flink.runtime.state.changelog.StateChange; @@ -32,6 +35,7 @@ import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -139,19 +143,28 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl private final TaskChangelogRegistry changelogRegistry; + /** The configuration for local recovery. */ + @Nonnull private final LocalRecoveryConfig localRecoveryConfig; + + private final LocalChangelogRegistry localChangelogRegistry; + FsStateChangelogWriter( UUID logId, KeyGroupRange keyGroupRange, StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, MailboxExecutor mailboxExecutor, - TaskChangelogRegistry changelogRegistry) { + TaskChangelogRegistry changelogRegistry, + LocalRecoveryConfig localRecoveryConfig, + LocalChangelogRegistry localChangelogRegistry) { this.logId = logId; this.keyGroupRange = keyGroupRange; this.uploader = uploader; this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes; this.mailboxExecutor = mailboxExecutor; this.changelogRegistry = changelogRegistry; + this.localRecoveryConfig = localRecoveryConfig; + this.localChangelogRegistry = localChangelogRegistry; } @Override @@ -185,8 +198,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl } @Override - public CompletableFuture<ChangelogStateHandleStreamImpl> persist(SequenceNumber from) - throws IOException { + public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist( + SequenceNumber from) throws IOException { LOG.debug( "persist {} starting from sqn {} (incl.), active sqn: {}", logId, @@ -195,8 +208,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl return persistInternal(from); } - private CompletableFuture<ChangelogStateHandleStreamImpl> persistInternal(SequenceNumber from) - throws IOException { + private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistInternal( + SequenceNumber from) throws IOException { ensureCanPersist(from); rollover(); Map<SequenceNumber, StateChangeSet> toUpload = drainTailMap(notUploaded, from); @@ -206,9 +219,11 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl SequenceNumberRange range = SequenceNumberRange.generic(from, activeSequenceNumber); if (range.size() == readyToReturn.size()) { checkState(toUpload.isEmpty()); - return CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn, 0L)); + return CompletableFuture.completedFuture( + buildSnapshotResult(keyGroupRange, readyToReturn, 0L)); } else { - CompletableFuture<ChangelogStateHandleStreamImpl> future = new CompletableFuture<>(); + CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future = + new CompletableFuture<>(); uploadCompletionListeners.add( new UploadCompletionListener(keyGroupRange, range, readyToReturn, future)); if (!toUpload.isEmpty()) { @@ -262,6 +277,9 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl // uploaded already truncated, i.e. materialized state changes, // or closed changelogRegistry.notUsed(result.streamStateHandle, logId); + if (result.localStreamHandle != null) { + changelogRegistry.notUsed(result.localStreamHandle, logId); + } } } } @@ -314,7 +332,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl } @Override - public void confirm(SequenceNumber from, SequenceNumber to) { + public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) { checkState(from.compareTo(to) <= 0, "Invalid confirm range: [%s,%s)", from, to); checkState( from.compareTo(activeSequenceNumber) <= 0 @@ -329,14 +347,30 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl uploaded.subMap(from, to).values().stream() .map(UploadResult::getStreamStateHandle) .forEach(changelogRegistry::stopTracking); + + // transfer the control of localHandle to localStateRegistry. + uploaded.subMap(from, to).values().stream() + .map(UploadResult::getLocalStreamHandleStateHandle) + .filter(localHandle -> localHandle != null) + .forEach( + localHandle -> { + changelogRegistry.stopTracking(localHandle); + localChangelogRegistry.register(localHandle, checkpointId); + }); + localChangelogRegistry.discardUpToCheckpoint(checkpointId); + } + + @Override + public void subsume(long checkpointId) { + localChangelogRegistry.discardUpToCheckpoint(checkpointId); } @Override - public void reset(SequenceNumber from, SequenceNumber to) { - // do nothing + public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) { + localChangelogRegistry.prune(checkpointId); } - private static ChangelogStateHandleStreamImpl buildHandle( + private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult( KeyGroupRange keyGroupRange, NavigableMap<SequenceNumber, UploadResult> results, long incrementalSize) { @@ -346,12 +380,42 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl tuples.add(Tuple2.of(uploadResult.getStreamStateHandle(), uploadResult.getOffset())); size += uploadResult.getSize(); } - return new ChangelogStateHandleStreamImpl( - tuples, - keyGroupRange, - size, - incrementalSize, - FsStateChangelogStorageFactory.IDENTIFIER); + ChangelogStateHandleStreamImpl jmChangelogStateHandle = + new ChangelogStateHandleStreamImpl( + tuples, + keyGroupRange, + size, + incrementalSize, + FsStateChangelogStorageFactory.IDENTIFIER); + if (localRecoveryConfig.isLocalRecoveryEnabled()) { + size = 0; + List<Tuple2<StreamStateHandle, Long>> localTuples = new ArrayList<>(); + for (UploadResult uploadResult : results.values()) { + if (uploadResult.getLocalStreamHandleStateHandle() != null) { + localTuples.add( + Tuple2.of( + uploadResult.getLocalStreamHandleStateHandle(), + uploadResult.getLocalOffset())); + size += uploadResult.getSize(); + } + } + ChangelogStateHandleStreamImpl localChangelogStateHandle = null; + if (localTuples.size() == tuples.size()) { + localChangelogStateHandle = + new ChangelogStateHandleStreamImpl( + localTuples, + keyGroupRange, + size, + 0L, + FsStateChangelogStorageFactory.IDENTIFIER); + return SnapshotResult.withLocalState( + jmChangelogStateHandle, localChangelogStateHandle); + + } else { + LOG.warn("local handles are different from remote"); + } + } + return SnapshotResult.of(jmChangelogStateHandle); } @VisibleForTesting @@ -378,9 +442,10 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl } } - private static final class UploadCompletionListener { + private final class UploadCompletionListener { private final NavigableMap<SequenceNumber, UploadResult> uploaded; - private final CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture; + private final CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> + completionFuture; private final KeyGroupRange keyGroupRange; private final SequenceNumberRange changeRange; @@ -388,7 +453,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl KeyGroupRange keyGroupRange, SequenceNumberRange changeRange, Map<SequenceNumber, UploadResult> uploaded, - CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture) { + CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> + completionFuture) { checkArgument( !changeRange.isEmpty(), "Empty change range not allowed: %s", changeRange); this.uploaded = new TreeMap<>(uploaded); @@ -405,7 +471,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl incrementalSize += uploadResult.getSize(); if (uploaded.size() == changeRange.size()) { completionFuture.complete( - buildHandle(keyGroupRange, uploaded, incrementalSize)); + buildSnapshotResult(keyGroupRange, uploaded, incrementalSize)); return true; } } @@ -440,6 +506,9 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl LOG.trace("Uploaded state to discard: {}", notUsedState); for (UploadResult result : notUsedState.values()) { changelogRegistry.notUsed(result.streamStateHandle, logId); + if (result.localStreamHandle != null) { + changelogRegistry.notUsed(result.localStreamHandle, logId); + } } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java index b6be3a766a3..89d71f6dde2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java @@ -17,15 +17,46 @@ package org.apache.flink.changelog.fs; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.StreamCompressionDecorator; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.util.Preconditions; + +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.function.BiFunction; class OutputStreamWithPos extends OutputStream { - private final OutputStream outputStream; - private long pos; + protected final Path path; + protected OutputStream outputStream; + protected long pos; + protected boolean compression; + protected final OutputStream originalStream; + + public OutputStreamWithPos(OutputStream outputStream, Path path) { + this.outputStream = Preconditions.checkNotNull(outputStream); + this.originalStream = Preconditions.checkNotNull(outputStream); + this.path = Preconditions.checkNotNull(path); + this.pos = 0; + this.compression = false; + } + + protected OutputStream wrapInternal(boolean compression, int bufferSize, OutputStream fsStream) + throws IOException { + fsStream.write(compression ? 1 : 0); + StreamCompressionDecorator instance = + compression + ? SnappyStreamCompressionDecorator.INSTANCE + : UncompressedStreamCompressionDecorator.INSTANCE; + return new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize); + } - public OutputStreamWithPos(OutputStream outputStream) { - this.outputStream = outputStream; + public void wrap(boolean compression, int bufferSize) throws IOException { + this.compression = compression; + this.outputStream = wrapInternal(compression, bufferSize, this.originalStream); } @Override @@ -53,10 +84,23 @@ class OutputStreamWithPos extends OutputStream { @Override public void close() throws IOException { - outputStream.close(); + try { + outputStream.close(); + originalStream.close(); + } catch (IOException e) { + getPath().getFileSystem().delete(getPath(), true); + } } public long getPos() { return pos; } + + public Path getPath() { + return path; + } + + public StreamStateHandle getHandle(BiFunction<Path, Long, StreamStateHandle> handleFactory) { + return handleFactory.apply(path, this.pos); + } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java index 7a672da4ba4..58d142305cd 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.changelog.fs; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.util.CloseableIterator; @@ -45,17 +46,18 @@ import static java.util.Comparator.comparing; public class StateChangeFormat { private static final Logger LOG = LoggerFactory.getLogger(StateChangeFormat.class); - Map<StateChangeSet, Long> write(OutputStreamWithPos os, Collection<StateChangeSet> changeSets) - throws IOException { + Map<StateChangeSet, Tuple2<Long, Long>> write( + OutputStreamWithPos os, Collection<StateChangeSet> changeSets) throws IOException { List<StateChangeSet> sorted = new ArrayList<>(changeSets); // using sorting instead of bucketing for simplicity sorted.sort( comparing(StateChangeSet::getLogId) .thenComparing(StateChangeSet::getSequenceNumber)); DataOutputViewStreamWrapper dataOutput = new DataOutputViewStreamWrapper(os); - Map<StateChangeSet, Long> pendingResults = new HashMap<>(); + Map<StateChangeSet, Tuple2<Long, Long>> pendingResults = new HashMap<>(); for (StateChangeSet changeSet : sorted) { - pendingResults.put(changeSet, os.getPos()); + long pos = os.getPos(); + pendingResults.put(changeSet, Tuple2.of(pos, pos)); writeChangeSet(dataOutput, changeSet.getChanges()); } return pendingResults; diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java index 8a37a07f460..e9e9662c65f 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java @@ -19,52 +19,29 @@ package org.apache.flink.changelog.fs; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; -import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.filesystem.FileStateHandle; -import org.apache.flink.util.clock.Clock; -import org.apache.flink.util.clock.SystemClock; - -import org.apache.flink.shaded.guava30.com.google.common.io.Closer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; import java.util.function.BiFunction; -import java.util.stream.Collectors; - -import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE; /** * A synchronous {@link StateChangeUploadScheduler} implementation that uploads the changes using * {@link FileSystem}. */ -public class StateChangeFsUploader implements StateChangeUploader { +public class StateChangeFsUploader extends AbstractStateChangeFsUploader { private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class); @VisibleForTesting public static final String PATH_SUB_DIR = "dstl"; private final Path basePath; private final FileSystem fileSystem; - private final StateChangeFormat format; - private final boolean compression; - private final int bufferSize; - private final ChangelogStorageMetricGroup metrics; - private final Clock clock; - private final TaskChangelogRegistry changelogRegistry; - private final BiFunction<Path, Long, StreamStateHandle> handleFactory; @VisibleForTesting public StateChangeFsUploader( @@ -95,16 +72,10 @@ public class StateChangeFsUploader implements StateChangeUploader { ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry, BiFunction<Path, Long, StreamStateHandle> handleFactory) { + super(compression, bufferSize, metrics, changelogRegistry, handleFactory); this.basePath = new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR)); this.fileSystem = fileSystem; - this.format = new StateChangeFormat(); - this.compression = compression; - this.bufferSize = bufferSize; - this.metrics = metrics; - this.clock = SystemClock.getInstance(); - this.changelogRegistry = changelogRegistry; - this.handleFactory = handleFactory; } @VisibleForTesting @@ -112,70 +83,15 @@ public class StateChangeFsUploader implements StateChangeUploader { return this.basePath; } - public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { + @Override + public OutputStreamWithPos prepareStream() throws IOException { final String fileName = generateFileName(); - LOG.debug("upload {} tasks to {}", tasks.size(), fileName); + LOG.debug("upload tasks to {}", fileName); Path path = new Path(basePath, fileName); - - try { - return uploadWithMetrics(path, tasks); - } catch (IOException e) { - metrics.getUploadFailuresCounter().inc(); - try (Closer closer = Closer.create()) { - closer.register( - () -> { - throw e; - }); - tasks.forEach(cs -> closer.register(() -> cs.fail(e))); - closer.register(() -> fileSystem.delete(path, true)); - } - } - return null; // closer above throws an exception - } - - private UploadTasksResult uploadWithMetrics(Path path, Collection<UploadTask> tasks) - throws IOException { - metrics.getUploadsCounter().inc(); - long start = clock.relativeTimeNanos(); - UploadTasksResult result = upload(path, tasks); - metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start); - metrics.getUploadSizes().update(result.getStateSize()); - return result; - } - - private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException { - try (FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE)) { - fsStream.write(compression ? 1 : 0); - try (OutputStreamWithPos stream = wrap(fsStream)) { - final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets = new HashMap<>(); - for (UploadTask task : tasks) { - tasksOffsets.put(task, format.write(stream, task.changeSets)); - } - StreamStateHandle handle = handleFactory.apply(path, stream.getPos()); - changelogRegistry.startTracking( - handle, - tasks.stream() - .flatMap(t -> t.getChangeSets().stream()) - .map(StateChangeSet::getLogId) - .collect(Collectors.toSet())); - // WARN: streams have to be closed before returning the results - // otherwise JM may receive invalid handles - return new UploadTasksResult(tasksOffsets, handle); - } - } - } - - private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws IOException { - StreamCompressionDecorator instance = - compression - ? SnappyStreamCompressionDecorator.INSTANCE - : UncompressedStreamCompressionDecorator.INSTANCE; - return new OutputStreamWithPos( - new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize)); - } - - private String generateFileName() { - return UUID.randomUUID().toString(); + OutputStreamWithPos outputStream = + new OutputStreamWithPos(fileSystem.create(path, WriteMode.NO_OVERWRITE), path); + outputStream.wrap(this.compression, this.bufferSize); + return outputStream; } @Override diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java index 6e3b31a8ab3..19f09a151e6 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.SequenceNumber; import javax.annotation.concurrent.ThreadSafe; @@ -88,21 +89,34 @@ public interface StateChangeUploadScheduler extends AutoCloseable { JobID jobID, ReadableConfig config, ChangelogStorageMetricGroup metricGroup, - TaskChangelogRegistry changelogRegistry) + TaskChangelogRegistry changelogRegistry, + LocalRecoveryConfig localRecoveryConfig) throws IOException { Path basePath = new Path(config.get(BASE_PATH)); long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes(); checkArgument(bytes <= Integer.MAX_VALUE); int bufferSize = (int) bytes; - StateChangeFsUploader store = - new StateChangeFsUploader( - jobID, - basePath, - basePath.getFileSystem(), - config.get(COMPRESSION_ENABLED), - bufferSize, - metricGroup, - changelogRegistry); + StateChangeUploader store = + localRecoveryConfig.isLocalRecoveryEnabled() + ? new DuplicatingStateChangeFsUploader( + jobID, + basePath, + basePath.getFileSystem(), + config.get(COMPRESSION_ENABLED), + bufferSize, + metricGroup, + changelogRegistry, + localRecoveryConfig + .getLocalStateDirectoryProvider() + .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled())) + : new StateChangeFsUploader( + jobID, + basePath, + basePath.getFileSystem(), + config.get(COMPRESSION_ENABLED), + bufferSize, + metricGroup, + changelogRegistry); BatchingStateChangeUploadScheduler batchingStore = new BatchingStateChangeUploadScheduler( config.get(PERSIST_DELAY).toMillis(), diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java index 1f18fe25d8b..537aa7947e5 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java @@ -19,10 +19,13 @@ package org.apache.flink.changelog.fs; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collection; import java.util.List; @@ -46,27 +49,45 @@ public interface StateChangeUploader extends AutoCloseable { /** Result of executing one or more {@link UploadTask upload tasks}. */ final class UploadTasksResult { - private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets; + private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets; private final StreamStateHandle handle; + private final StreamStateHandle localHandle; + + public UploadTasksResult( + Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets, + StreamStateHandle handle) { + this(tasksOffsets, handle, null); + } public UploadTasksResult( - Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) { + Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets, + StreamStateHandle handle, + @Nullable StreamStateHandle localHandle) { this.tasksOffsets = unmodifiableMap(tasksOffsets); this.handle = Preconditions.checkNotNull(handle); + this.localHandle = localHandle; } public void complete() { - for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : tasksOffsets.entrySet()) { + for (Map.Entry<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> entry : + tasksOffsets.entrySet()) { UploadTask task = entry.getKey(); - Map<StateChangeSet, Long> offsets = entry.getValue(); + Map<StateChangeSet, Tuple2<Long, Long>> offsets = entry.getValue(); task.complete(buildResults(handle, offsets)); } } private List<UploadResult> buildResults( - StreamStateHandle handle, Map<StateChangeSet, Long> offsets) { + StreamStateHandle handle, Map<StateChangeSet, Tuple2<Long, Long>> offsets) { return offsets.entrySet().stream() - .map(e -> UploadResult.of(handle, e.getKey(), e.getValue())) + .map( + e -> + UploadResult.of( + handle, + localHandle, + e.getKey(), + e.getValue().f0, + e.getValue().f1)) .collect(toList()); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java index b2caa5c3bd9..385c502b076 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java @@ -21,13 +21,17 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; +import javax.annotation.Nullable; + import static org.apache.flink.util.Preconditions.checkNotNull; /** Result of uploading state changes. */ @Internal public final class UploadResult { public final StreamStateHandle streamStateHandle; + @Nullable public final StreamStateHandle localStreamHandle; public final long offset; + public final long localOffset; public final SequenceNumber sequenceNumber; public final long size; @@ -36,24 +40,55 @@ public final class UploadResult { long offset, SequenceNumber sequenceNumber, long size) { + this(streamStateHandle, null, offset, offset, sequenceNumber, size); + } + + public UploadResult( + StreamStateHandle streamStateHandle, + @Nullable StreamStateHandle localStreamHandle, + long offset, + long localOffset, + SequenceNumber sequenceNumber, + long size) { this.streamStateHandle = checkNotNull(streamStateHandle); + this.localStreamHandle = localStreamHandle; this.offset = offset; + this.localOffset = localOffset; this.sequenceNumber = checkNotNull(sequenceNumber); this.size = size; } - public static UploadResult of(StreamStateHandle handle, StateChangeSet changeSet, long offset) { - return new UploadResult(handle, offset, changeSet.getSequenceNumber(), changeSet.getSize()); + public static UploadResult of( + StreamStateHandle handle, + StreamStateHandle localHandle, + StateChangeSet changeSet, + long offset, + long localOffset) { + return new UploadResult( + handle, + localHandle, + offset, + localOffset, + changeSet.getSequenceNumber(), + changeSet.getSize()); } public StreamStateHandle getStreamStateHandle() { return streamStateHandle; } + public StreamStateHandle getLocalStreamHandleStateHandle() { + return localStreamHandle; + } + public long getOffset() { return offset; } + public long getLocalOffset() { + return localOffset; + } + public SequenceNumber getSequenceNumber() { return sequenceNumber; } @@ -66,6 +101,8 @@ public final class UploadResult { public String toString() { return "streamStateHandle=" + streamStateHandle + + "localStreamHandle" + + localStreamHandle + ", size=" + size + ", offset=" diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java index 21e6e932f8c..d70c3f63082 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java @@ -462,13 +462,13 @@ class BatchingStateChangeUploadSchedulerTest { return uploadsCounter.get(); } - private Map<UploadTask, Map<StateChangeSet, Long>> withOffsets( + private Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> withOffsets( Collection<UploadTask> tasks) { return tasks.stream().collect(toMap(identity(), this::withOffsets)); } - private Map<StateChangeSet, Long> withOffsets(UploadTask task) { - return task.changeSets.stream().collect(toMap(identity(), ign -> 0L)); + private Map<StateChangeSet, Tuple2<Long, Long>> withOffsets(UploadTask task) { + return task.changeSets.stream().collect(toMap(identity(), ign -> Tuple2.of(0L, 0L))); } } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java index 25acff4f606..c86276d0ad7 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java @@ -18,6 +18,7 @@ package org.apache.flink.changelog.fs; */ import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; @@ -26,6 +27,7 @@ import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; @@ -69,7 +71,8 @@ public class ChangelogStorageMetricsTest { false, 100, metrics, - TaskChangelogRegistry.NO_OP)) { + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled())) { FsStateChangelogWriter writer = createWriter(storage); int numUploads = 5; for (int i = 0; i < numUploads; i++) { @@ -94,7 +97,8 @@ public class ChangelogStorageMetricsTest { false, 100, metrics, - TaskChangelogRegistry.NO_OP)) { + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled())) { FsStateChangelogWriter writer = createWriter(storage); // upload single byte to infer header size @@ -127,7 +131,8 @@ public class ChangelogStorageMetricsTest { false, 100, metrics, - TaskChangelogRegistry.NO_OP)) { + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled())) { FsStateChangelogWriter writer = createWriter(storage); int numUploads = 5; @@ -178,7 +183,10 @@ public class ChangelogStorageMetricsTest { FsStateChangelogStorage storage = new FsStateChangelogStorage( - batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP); + batcher, + Integer.MAX_VALUE, + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled()); FsStateChangelogWriter[] writers = new FsStateChangelogWriter[numWriters]; for (int i = 0; i < numWriters; i++) { writers[i] = @@ -230,7 +238,10 @@ public class ChangelogStorageMetricsTest { FsStateChangelogStorage storage = new FsStateChangelogStorage( - batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP); + batcher, + Integer.MAX_VALUE, + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled()); FsStateChangelogWriter writer = createWriter(storage); try { @@ -272,7 +283,10 @@ public class ChangelogStorageMetricsTest { FsStateChangelogStorage storage = new FsStateChangelogStorage( - batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP); + batcher, + Integer.MAX_VALUE, + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled()); FsStateChangelogWriter writer = createWriter(storage); try { @@ -335,7 +349,11 @@ public class ChangelogStorageMetricsTest { metrics.getTotalAttemptsPerUpload()), metrics); try (FsStateChangelogStorage storage = - new FsStateChangelogStorage(batcher, Long.MAX_VALUE, TaskChangelogRegistry.NO_OP)) { + new FsStateChangelogStorage( + batcher, + Long.MAX_VALUE, + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled())) { FsStateChangelogWriter writer = createWriter(storage); int numUploads = 11; for (int i = 0; i < numUploads; i++) { @@ -360,7 +378,7 @@ public class ChangelogStorageMetricsTest { @Override public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { - Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>(); + Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new HashMap<>(); for (UploadTask uploadTask : tasks) { int currentAttempt = 1 + attemptsPerTask.getOrDefault(uploadTask, 0); if (currentAttempt == maxAttempts) { @@ -368,7 +386,10 @@ public class ChangelogStorageMetricsTest { map.put( uploadTask, uploadTask.changeSets.stream() - .collect(Collectors.toMap(Function.identity(), ign -> 0L))); + .collect( + Collectors.toMap( + Function.identity(), + ign -> Tuple2.of(0L, 0L)))); } else { attemptsPerTask.put(uploadTask, currentAttempt); throw new IOException(); @@ -420,12 +441,14 @@ public class ChangelogStorageMetricsTest { } } - Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>(); + Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new HashMap<>(); for (UploadTask uploadTask : tasks) { map.put( uploadTask, uploadTask.changeSets.stream() - .collect(Collectors.toMap(Function.identity(), ign -> 0L))); + .collect( + Collectors.toMap( + Function.identity(), ign -> Tuple2.of(0L, 0L)))); } return new UploadTasksResult(map, new EmptyStreamStateHandle()); } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java index 01b45762f5c..03fc8a9b3e4 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java @@ -22,6 +22,7 @@ import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.Bloc import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -57,7 +58,8 @@ public class FsStateChangelogStorageTest compression, 1024 * 1024 * 10, createUnregisteredChangelogStorageMetricGroup(), - TaskChangelogRegistry.NO_OP); + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled()); } /** @@ -103,7 +105,8 @@ public class FsStateChangelogStorageTest new FsStateChangelogStorage( scheduler, 0, - TaskChangelogRegistry.NO_OP /* persist immediately */) + TaskChangelogRegistry.NO_OP, /* persist immediately */ + TestLocalRecoveryConfig.disabled()) .createWriter( new OperatorID().toString(), KeyGroupRange.of(0, 0), diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java index 926f188bb5c..4ea8f665002 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java @@ -19,6 +19,8 @@ package org.apache.flink.changelog.fs; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.util.function.ThrowingConsumer; @@ -80,7 +82,9 @@ public class FsStateChangelogWriterSqnTest { new TestingStateChangeUploader()), Long.MAX_VALUE, new SyncMailboxExecutor(), - TaskChangelogRegistry.NO_OP)) { + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled(), + LocalChangelogRegistry.NO_OP)) { if (writerSqnTestSettings.withAppend) { append(writer); } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java index b9e6227cf1d..d2a3aa61f79 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java @@ -19,7 +19,10 @@ package org.apache.flink.changelog.fs; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.util.function.BiConsumerWithException; @@ -72,12 +75,15 @@ class FsStateChangelogWriterTest { withWriter( (writer, uploader) -> { byte[] bytes = getBytes(); - CompletableFuture<ChangelogStateHandleStreamImpl> future = + CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future = writer.persist(append(writer, bytes)); assertSubmittedOnly(uploader, bytes); uploader.completeUpload(); assertThat( - getOnlyElement(future.get().getHandlesAndOffsets()) + getOnlyElement( + future.get() + .getJobManagerOwnedSnapshot() + .getHandlesAndOffsets()) .f0 .asBytesIfInMemory() .get()) @@ -94,7 +100,7 @@ class FsStateChangelogWriterTest { writer.persist(sqn); uploader.completeUpload(); uploader.reset(); - writer.confirm(sqn, writer.nextSequenceNumber()); + writer.confirm(sqn, writer.nextSequenceNumber(), 1L); writer.persist(sqn); assertNoUpload(uploader, "confirmed changes shouldn't be re-uploaded"); }); @@ -134,7 +140,7 @@ class FsStateChangelogWriterTest { (writer, uploader) -> { byte[] bytes = getBytes(); SequenceNumber sqn = append(writer, bytes); - writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE)); + writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE), Long.MAX_VALUE); uploader.reset(); writer.persist(sqn); assertSubmittedOnly(uploader, bytes); @@ -149,7 +155,9 @@ class FsStateChangelogWriterTest { (writer, uploader) -> { byte[] bytes = getBytes(); SequenceNumber sqn = append(writer, bytes); - CompletableFuture<ChangelogStateHandleStreamImpl> + CompletableFuture< + SnapshotResult< + ChangelogStateHandleStreamImpl>> future = writer.persist(sqn); uploader.failUpload(new RuntimeException("test")); try { @@ -186,7 +194,8 @@ class FsStateChangelogWriterTest { uploader.failUpload(new RuntimeException("test")); uploader.reset(); SequenceNumber sqn2 = append(writer, bytes); - CompletableFuture<ChangelogStateHandleStreamImpl> future = writer.persist(sqn2); + CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future = + writer.persist(sqn2); uploader.completeUpload(); future.get(); }); @@ -225,7 +234,9 @@ class FsStateChangelogWriterTest { StateChangeUploadScheduler.directScheduler(uploader), appendPersistThreshold, new SyncMailboxExecutor(), - TaskChangelogRegistry.NO_OP)) { + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled(), + LocalChangelogRegistry.NO_OP)) { test.accept(writer, uploader); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java index a814589d45f..23b2b6fd42a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -36,6 +37,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -44,6 +46,7 @@ import java.util.concurrent.Executor; import java.util.function.LongPredicate; import java.util.stream.Collectors; +import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR; import static org.apache.flink.util.Preconditions.checkState; /** Changelog's implementation of a {@link TaskLocalStateStore}. */ @@ -97,6 +100,21 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl { } } + public static Path getLocalTaskOwnedDirectory( + LocalRecoveryDirectoryProvider provider, JobID jobID) { + File outDir = + provider.selectAllocationBaseDirectory( + (jobID.hashCode() & Integer.MAX_VALUE) + % provider.allocationBaseDirsCount()); + if (!outDir.exists() && !outDir.mkdirs()) { + LOG.error( + "Local state base directory does not exist and could not be created: " + + outDir); + } + return new Path( + String.format("%s/jid_%s", outDir.toURI(), jobID), CHECKPOINT_TASK_OWNED_STATE_DIR); + } + @Override public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot localState) { if (checkpointId < lastCheckpointId) { @@ -137,13 +155,13 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl { discardExecutor.execute( () -> - syncDiscardDirectoryForCollection( + syncDiscardFileForCollection( materializationToRemove.stream() .map(super::getCheckpointDirectory) .collect(Collectors.toList()))); } - private void syncDiscardDirectoryForCollection(Collection<File> toDiscard) { + private void syncDiscardFileForCollection(Collection<File> toDiscard) { for (File directory : toDiscard) { if (directory.exists()) { try { @@ -181,6 +199,17 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl { @Override public CompletableFuture<Void> dispose() { deleteMaterialization(id -> true); + // delete all ChangelogStateHandle in taskowned directory. + discardExecutor.execute( + () -> + syncDiscardFileForCollection( + Collections.singleton( + new File( + getLocalTaskOwnedDirectory( + getLocalRecoveryDirectoryProvider(), + jobID) + .toUri())))); + synchronized (lock) { mapToMaterializationId.clear(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java index 59c3c090bf3..b0d928f7751 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java @@ -89,7 +89,8 @@ public class TaskExecutorStateChangelogStoragesManager { public StateChangelogStorage<?> stateChangelogStorageForJob( @Nonnull JobID jobId, Configuration configuration, - TaskManagerJobMetricGroup metricGroup) + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) throws IOException { synchronized (lock) { if (closed) { @@ -103,7 +104,8 @@ public class TaskExecutorStateChangelogStoragesManager { if (stateChangelogStorage == null) { StateChangelogStorage<?> loaded = - StateChangelogStorageLoader.load(jobId, configuration, metricGroup); + StateChangelogStorageLoader.load( + jobId, configuration, metricGroup, localRecoveryConfig); stateChangelogStorage = Optional.ofNullable(loaded); changelogStoragesByJobId.put(jobId, stateChangelogStorage); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java new file mode 100644 index 00000000000..a2ffb66b84d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.state.StreamStateHandle; + +/** This registry is responsible for deleting changlog's local handles which are not in use. */ +@Internal +public interface LocalChangelogRegistry { + LocalChangelogRegistry NO_OP = + new LocalChangelogRegistry() { + @Override + public void register(StreamStateHandle handle, long checkpointID) {} + + @Override + public void discardUpToCheckpoint(long upTo) {} + + @Override + public void prune(long checkpointID) {} + }; + + /** + * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete. + * + * @param handle handle to register. + * @param checkpointID latest used checkpointID. + */ + void register(StreamStateHandle handle, long checkpointID); + + /** + * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete and + * ChangelogKeyedStateBackend#notifyCheckpointSubsumed. Remote dstl handles are unregistered + * when {@link CompletedCheckpointStore#addCheckpointAndSubsumeOldestOne}, local dtsl handles + * are unregistered when the checkpoint completes, because only one checkpoint is kept for local + * recovery. + * + * @param upTo lowest CheckpointID which is still valid. + */ + void discardUpToCheckpoint(long upTo); + + /** + * Called upon ChangelogKeyedStateBackend#notifyCheckpointAborted. + * + * @param checkpointID to abort + */ + void prune(long checkpointID); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java new file mode 100644 index 00000000000..acb8ae318ca --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.PhysicalStateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; + +@Internal +public class LocalChangelogRegistryImpl implements LocalChangelogRegistry { + private static final Logger LOG = LoggerFactory.getLogger(LocalChangelogRegistry.class); + /** + * All registered handles. (PhysicalStateHandleID, (handle, checkpointID)) represents a handle + * and the latest checkpoint that refer to this handle. + */ + private final Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>> + handleToLastUsedCheckpointID = new ConcurrentHashMap<>(); + + /** Executor for async state deletion. */ + private final Executor asyncDisposalExecutor; + + public LocalChangelogRegistryImpl(Executor ioExecutor) { + this.asyncDisposalExecutor = ioExecutor; + } + + public void register(StreamStateHandle handle, long checkpointID) { + handleToLastUsedCheckpointID.compute( + handle.getStreamStateHandleID(), + (k, v) -> { + if (v == null) { + return Tuple2.of(handle, checkpointID); + } else { + Preconditions.checkState(handle.equals(v.f0)); + return Tuple2.of(handle, Math.max(v.f1, checkpointID)); + } + }); + } + + public void discardUpToCheckpoint(long upTo) { + List<StreamStateHandle> handles = new ArrayList<>(); + synchronized (handleToLastUsedCheckpointID) { + Iterator<Tuple2<StreamStateHandle, Long>> iterator = + handleToLastUsedCheckpointID.values().iterator(); + while (iterator.hasNext()) { + Tuple2<StreamStateHandle, Long> entry = iterator.next(); + if (entry.f1 < upTo) { + handles.add(entry.f0); + iterator.remove(); + } + } + } + for (StreamStateHandle handle : handles) { + scheduleAsyncDelete(handle); + } + } + + public void prune(long checkpointID) { + Set<StreamStateHandle> handles = + handleToLastUsedCheckpointID.values().stream() + .filter(tuple -> tuple.f1 == checkpointID) + .map(tuple -> tuple.f0) + .collect(Collectors.toSet()); + for (StreamStateHandle handle : handles) { + scheduleAsyncDelete(handle); + } + } + + private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { + if (streamStateHandle != null) { + LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); + Runnable discardRunner = + () -> { + try { + streamStateHandle.discardState(); + } catch (Exception exception) { + LOG.warn( + "A problem occurred during asynchronous disposal of a stream handle {}.", + streamStateHandle); + } + }; + try { + asyncDisposalExecutor.execute(discardRunner); + } catch (RejectedExecutionException ex) { + discardRunner.run(); + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java index be86a416e6c..37d9091ea30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import java.io.IOException; @@ -36,7 +37,10 @@ public interface StateChangelogStorageFactory { /** Create the storage based on a configuration. */ StateChangelogStorage<?> createStorage( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) throws IOException; /** Create the storage for recovery. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java index 622c09f9956..ee7d39f8a34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; @@ -87,7 +88,10 @@ public class StateChangelogStorageLoader { @Nullable public static StateChangelogStorage<?> load( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) throws IOException { final String identifier = configuration @@ -100,7 +104,7 @@ public class StateChangelogStorageLoader { return null; } else { LOG.info("Creating a changelog storage with name '{}'.", identifier); - return factory.createStorage(jobID, configuration, metricGroup); + return factory.createStorage(jobID, configuration, metricGroup, localRecoveryConfig); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java index 0ee4f615a31..756528139d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.SnapshotResult; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -41,13 +42,13 @@ public interface StateChangelogWriter<Handle extends ChangelogStateHandle> exten /** * Durably persist previously {@link #append(int, byte[]) appended} data starting from the * provided {@link SequenceNumber} and up to the latest change added. After this call, one of - * {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link #reset(SequenceNumber, - * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must be - * called for the corresponding change set. with reset/truncate/confirm methods? + * {@link #confirm(SequenceNumber, SequenceNumber, long) confirm}, {@link #reset(SequenceNumber, + * SequenceNumber, long) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must + * be called for the corresponding change set. with reset/truncate/confirm methods? * * @param from inclusive */ - CompletableFuture<Handle> persist(SequenceNumber from) throws IOException; + CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from) throws IOException; /** * Truncate this state changelog to free up the resources and collect any garbage. That means: @@ -71,14 +72,22 @@ public interface StateChangelogWriter<Handle extends ChangelogStateHandle> exten * * @param from inclusive * @param to exclusive + * @param checkpointId to confirm */ - void confirm(SequenceNumber from, SequenceNumber to); + void confirm(SequenceNumber from, SequenceNumber to, long checkpointId); + + /** + * Mark the state changes of the given checkpoint as subsumed. + * + * @param checkpointId + */ + void subsume(long checkpointId); /** * Reset the state the given state changes. Called upon abortion so that if requested later then * these changes will be re-uploaded. */ - void reset(SequenceNumber from, SequenceNumber to); + void reset(SequenceNumber from, SequenceNumber to, long checkpointId); /** * Truncate the tail of log and close it. No new appends will be possible. Any appended but not diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java index 786a6a7be29..39ca25fde79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory; import org.apache.flink.runtime.state.changelog.StateChangelogStorageView; @@ -36,7 +37,10 @@ public class InMemoryStateChangelogStorageFactory implements StateChangelogStora @Override public StateChangelogStorage<?> createStorage( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) { + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) { return new InMemoryStateChangelogStorage(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index 3bd0a969d83..eb07ba47716 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -75,11 +76,14 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang } @Override - public CompletableFuture<InMemoryChangelogStateHandle> persist(SequenceNumber from) { + public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> persist( + SequenceNumber from) { LOG.debug("Persist after {}", from); Preconditions.checkNotNull(from); return completedFuture( - new InMemoryChangelogStateHandle(collectChanges(from), from, sqn, keyGroupRange)); + SnapshotResult.of( + new InMemoryChangelogStateHandle( + collectChanges(from), from, sqn, keyGroupRange))); } private List<StateChange> collectChanges(SequenceNumber after) { @@ -113,8 +117,11 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang } @Override - public void confirm(SequenceNumber from, SequenceNumber to) {} + public void confirm(SequenceNumber from, SequenceNumber to, long checkpointID) {} @Override - public void reset(SequenceNumber from, SequenceNumber to) {} + public void subsume(long checkpointId) {} + + @Override + public void reset(SequenceNumber from, SequenceNumber to, long checkpointID) {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 26b274f5a5c..f4481ea9d80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -700,7 +700,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { try { changelogStorage = changelogStoragesManager.stateChangelogStorageForJob( - jobId, taskManagerConfiguration.getConfiguration(), jobGroup); + jobId, + taskManagerConfiguration.getConfiguration(), + jobGroup, + localStateStore.getLocalRecoveryConfig()); } catch (IOException e) { throw new TaskSubmissionException(e); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java index 8493064f806..abf058986a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java @@ -131,6 +131,8 @@ public class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTes assertTrue(stateSnapshot2.isDiscarded()); // the materialized part of checkpoint 2 retain, because it still used by checkpoint 3 assertTrue(checkMaterializedDirExists(2)); + // checkpoint 1 retain + assertEquals(stateSnapshot1, taskLocalStateStore.retrieveLocalState(1)); assertTrue(checkMaterializedDirExists(1)); assertEquals(stateSnapshot3, taskLocalStateStore.retrieveLocalState(3)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java index b0efaa9c427..1ad0c7b8231 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java @@ -53,16 +53,25 @@ public class TaskExecutorStateChangelogStoragesManagerTest { JobID jobId1 = new JobID(1L, 1L); StateChangelogStorage<?> storage1 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); StateChangelogStorage<?> storage2 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertEquals(storage1, storage2); JobID jobId2 = new JobID(1L, 2L); StateChangelogStorage<?> storage3 = manager.stateChangelogStorageForJob( - jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId2, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNotEquals(storage1, storage3); manager.shutdown(); } @@ -79,7 +88,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest { JobID jobId1 = new JobID(1L, 1L); StateChangelogStorage<?> storage1 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertTrue(storage1 instanceof TestStateChangelogStorage); Assert.assertFalse(((TestStateChangelogStorage) storage1).closed); manager.releaseResourcesForJob(jobId1); @@ -87,7 +99,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest { StateChangelogStorage<?> storage2 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNotEquals(storage1, storage2); manager.shutdown(); @@ -104,7 +119,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest { JobID jobId1 = new JobID(1L, 1L); StateChangelogStorage<?> storage1 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNull(storage1); // change configuration, assert the result not change. @@ -113,19 +131,28 @@ public class TaskExecutorStateChangelogStoragesManagerTest { StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue()); StateChangelogStorage<?> storage2 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNull(storage2); JobID jobId2 = new JobID(1L, 2L); StateChangelogStorage<?> storage3 = manager.stateChangelogStorageForJob( - jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId2, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNotNull(storage3); configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid"); StateChangelogStorage<?> storage4 = manager.stateChangelogStorageForJob( - jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId2, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertNotNull(storage4); Assert.assertEquals(storage3, storage4); @@ -144,14 +171,20 @@ public class TaskExecutorStateChangelogStoragesManagerTest { JobID jobId1 = new JobID(1L, 1L); StateChangelogStorage<?> storage1 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertTrue(storage1 instanceof TestStateChangelogStorage); Assert.assertFalse(((TestStateChangelogStorage) storage1).closed); JobID jobId2 = new JobID(1L, 2L); StateChangelogStorage<?> storage2 = manager.stateChangelogStorageForJob( - jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup()); + jobId1, + configuration, + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); Assert.assertTrue(storage2 instanceof TestStateChangelogStorage); Assert.assertFalse(((TestStateChangelogStorage) storage2).closed); @@ -207,7 +240,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest { @Override public StateChangelogStorage<?> createStorage( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) { + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) { return new TestStateChangelogStorage(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java new file mode 100644 index 00000000000..d28c33e603a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.runtime.state.TestingStreamStateHandle; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** {@link LocalChangelogRegistryImpl}'s test. */ +public class LocalChangelogRegistryTest extends TestLogger { + + @Test + public void testRegistryNormal() { + LocalChangelogRegistry localStateRegistry = + new LocalChangelogRegistryImpl(Executors.directExecutor()); + TestingStreamStateHandle handle1 = new TestingStreamStateHandle(); + TestingStreamStateHandle handle2 = new TestingStreamStateHandle(); + // checkpoint 1: handle1, handle2 + localStateRegistry.register(handle1, 1); + localStateRegistry.register(handle2, 1); + + // checkpoint 2: handle2, handle3 + TestingStreamStateHandle handle3 = new TestingStreamStateHandle(); + localStateRegistry.register(handle2, 2); + localStateRegistry.register(handle3, 2); + + localStateRegistry.discardUpToCheckpoint(2); + assertTrue(handle1.isDisposed()); + assertFalse(handle2.isDisposed()); + + localStateRegistry.discardUpToCheckpoint(3); + assertTrue(handle2.isDisposed()); + assertTrue(handle3.isDisposed()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java index 6c091eeb546..94782f9dda8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java @@ -24,6 +24,8 @@ import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; @@ -55,7 +57,8 @@ public class StateChangelogStorageLoaderTest { StateChangelogStorageLoader.load( JobID.generate(), new Configuration(), - createUnregisteredTaskManagerJobMetricGroup())); + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled())); } @Test @@ -66,7 +69,8 @@ public class StateChangelogStorageLoaderTest { JobID.generate(), new Configuration() .set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"), - createUnregisteredTaskManagerJobMetricGroup())); + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled())); } @Test @@ -79,7 +83,8 @@ public class StateChangelogStorageLoaderTest { StateChangelogStorageLoader.load( JobID.generate(), new Configuration(), - createUnregisteredTaskManagerJobMetricGroup()); + createUnregisteredTaskManagerJobMetricGroup(), + TestLocalRecoveryConfig.disabled()); assertTrue(loaded instanceof TestStateChangelogStorage); } @@ -120,7 +125,10 @@ public class StateChangelogStorageLoaderTest { @Override public StateChangelogStorage<?> createStorage( - JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) { + JobID jobID, + Configuration configuration, + TaskManagerJobMetricGroup metricGroup, + LocalRecoveryConfig localRecoveryConfig) { return new TestStateChangelogStorage(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java index a130ba79723..f0c92ab55ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; @@ -97,10 +98,10 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> { writer.nextSequenceNumber(); } - T handle = writer.persist(prev).get(); + SnapshotResult<T> res = writer.persist(prev).get(); + T jmHandle = res.getJobManagerOwnedSnapshot(); StateChangelogHandleReader<T> reader = client.createReader(); - - assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader)); + assertByteMapsEqual(appendsByKeyGroup, extract(jmHandle, reader)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java index 0739c96e271..60d282da8f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java @@ -52,12 +52,15 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.testutils.TestFileUtils; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.Reference; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.concurrent.FutureUtils; import org.junit.After; @@ -65,7 +68,9 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.util.Collections; @@ -108,6 +113,8 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @Before public void setup() { haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); @@ -136,13 +143,18 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>(); final TestingResourceManagerGateway testingResourceManagerGateway = setupResourceManagerGateway(initialSlotReportFuture); - + final TaskExecutorLocalStateStoresManager localStateStoresManager = + new TaskExecutorLocalStateStoresManager( + false, + Reference.owned(new File[] {tmp.newFolder()}), + Executors.directExecutor()); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskSlotTable( TaskSlotUtils.createTaskSlotTable( 1, timeout, EXECUTOR_RESOURCE.getExecutor())) .setShuffleEnvironment(new NettyShuffleEnvironmentBuilder().build()) + .setTaskStateManager(localStateStoresManager) .build(); final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java index f91f9e72b37..35f099b4215 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -51,14 +52,18 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.testutils.TestFileUtils; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.Reference; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.FunctionUtils; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.util.concurrent.ArrayBlockingQueue; @@ -84,6 +89,8 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger { public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @Before public void setup() { UserClassLoaderExtractingInvokable.clearQueue(); @@ -221,6 +228,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger { TaskSlotUtils.createTaskSlotTable( 1, EXECUTOR_RESOURCE.getExecutor())) .setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation) + .setTaskStateManager(createTaskExecutorLocalStateStoresManager()) .build(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new TestingHeartbeatServices(), @@ -231,6 +239,12 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger { new TestingTaskExecutorPartitionTracker()); } + private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() + throws IOException { + return new TaskExecutorLocalStateStoresManager( + false, Reference.owned(new File[] {tmp.newFolder()}), Executors.directExecutor()); + } + public static final class UserClassLoaderExtractingInvokable extends AbstractInvokable { private static BlockingQueue<ClassLoader> userCodeClassLoaders = diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index eb5792dc23a..9c7fe9d57bb 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -417,22 +417,34 @@ public class ChangelogKeyedStateBackend<K> private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult( long checkpointId, - ChangelogStateHandle delta, + SnapshotResult<? extends ChangelogStateHandle> delta, ChangelogSnapshotState changelogStateBackendStateCopy) { // collections don't change once started and handles are immutable List<ChangelogStateHandle> prevDeltaCopy = new ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized()); long persistedSizeOfThisCheckpoint = 0L; - if (delta != null && delta.getStateSize() > 0) { - prevDeltaCopy.add(delta); - persistedSizeOfThisCheckpoint += delta.getCheckpointedSize(); + if (delta != null + && delta.getJobManagerOwnedSnapshot() != null + && delta.getJobManagerOwnedSnapshot().getStateSize() > 0) { + prevDeltaCopy.add(delta.getJobManagerOwnedSnapshot()); + persistedSizeOfThisCheckpoint += + delta.getJobManagerOwnedSnapshot().getCheckpointedSize(); } if (prevDeltaCopy.isEmpty() && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) { return SnapshotResult.empty(); - } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) { + } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty() + || delta.getTaskLocalSnapshot() != null) { + List<ChangelogStateHandle> localDeltaCopy = + new ArrayList<>( + changelogStateBackendStateCopy.getLocalRestoredNonMaterialized()); + if (delta != null + && delta.getTaskLocalSnapshot() != null + && delta.getTaskLocalSnapshot().getStateSize() > 0) { + localDeltaCopy.add(delta.getTaskLocalSnapshot()); + } ChangelogStateBackendHandleImpl jmHandle = new ChangelogStateBackendHandleImpl( changelogStateBackendStateCopy.getMaterializedSnapshot(), @@ -445,7 +457,7 @@ public class ChangelogKeyedStateBackend<K> jmHandle, new ChangelogStateBackendLocalHandle( changelogStateBackendStateCopy.getLocalMaterializedSnapshot(), - prevDeltaCopy, + localDeltaCopy, jmHandle)); } else { return SnapshotResult.of( @@ -540,7 +552,7 @@ public class ChangelogKeyedStateBackend<K> // newer upload instead of the previous one. This newer upload could then be re-used // while in fact JM has discarded its results. // This might change if the log ownership changes (the method won't likely be needed). - stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo); + stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo, checkpointId); } Long materializationID = materializationIdByCheckpointId.remove(checkpointId); if (materializationID != null) { @@ -559,7 +571,7 @@ public class ChangelogKeyedStateBackend<K> // change if it is not relevant anymore. Otherwise, it could DISCARD a newer upload // instead of the previous one. Rely on truncation for the cleanup in this case. // This might change if the log ownership changes (the method won't likely be needed). - stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo); + stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo, checkpointId); } // TODO: Consider notifying nested state backend about checkpoint abortion (FLINK-25850) } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java index 6a6b71868b5..12b5d3388f2 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java @@ -70,6 +70,7 @@ class ChangelogTruncateHelper { subsumedUpTo = sqn; checkpointedUpTo.headMap(checkpointId, true).clear(); truncate(); + stateChangelogWriter.subsume(checkpointId); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java index 6a3a41bccd1..c26a736ddb9 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -190,7 +191,8 @@ public class ChangelogStateBackendTestUtils { new ChangelogStorageMetricGroup( UnregisteredMetricGroups .createUnregisteredTaskManagerJobMetricGroup()), - TaskChangelogRegistry.NO_OP)) + TaskChangelogRegistry.NO_OP, + TestLocalRecoveryConfig.disabled())) .build(); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java index 26855819f1e..1e69afe2f9b 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.FsStateChangelogStorage; import org.apache.flink.changelog.fs.StateChangeSet; import org.apache.flink.changelog.fs.StateChangeUploadScheduler; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestingStreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; @@ -128,7 +130,8 @@ public class ChangelogStateDiscardTest { TaskChangelogRegistry.defaultChangelogRegistry(directExecutor()); final TestingUploadScheduler scheduler = new TestingUploadScheduler(registry); singleBackendTest( - new FsStateChangelogStorage(scheduler, 0L, registry), + new FsStateChangelogStorage( + scheduler, 0L, registry, TestLocalRecoveryConfig.disabled()), (backend, writer) -> { changeAndLogRandomState(backend, scheduler.uploads::size); truncate(writer, backend); @@ -185,7 +188,8 @@ public class ChangelogStateDiscardTest { TaskChangelogRegistry.defaultChangelogRegistry(directExecutor()); final TestingUploadScheduler scheduler = new TestingUploadScheduler(registry); final StateChangelogStorage<?> storage = - new FsStateChangelogStorage(scheduler, 0, registry); + new FsStateChangelogStorage( + scheduler, 0, registry, TestLocalRecoveryConfig.disabled()); final StateChangelogWriter<?> w1 = storage.createWriter("test-operator-1", kgRange, new SyncMailboxExecutor()), w2 = storage.createWriter("test-operator-2", kgRange, new SyncMailboxExecutor()); @@ -222,7 +226,10 @@ public class ChangelogStateDiscardTest { long preEmptivePersistThresholdInBytes = 0L; // flush ASAP singleBackendTest( new FsStateChangelogStorage( - directScheduler(uploader), preEmptivePersistThresholdInBytes, registry), + directScheduler(uploader), + preEmptivePersistThresholdInBytes, + registry, + TestLocalRecoveryConfig.disabled()), (backend, writer) -> testCase.accept(backend, writer, uploader)); } @@ -367,14 +374,15 @@ public class ChangelogStateDiscardTest { results.add(handle); // todo: avoid making StateChangeSet and its internals public? // todo: make the contract more explicit or extract common code - Map<UploadTask, Map<StateChangeSet, Long>> taskOffsets = + Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> taskOffsets = tasks.stream().collect(toMap(identity(), this::mapOffsets)); tasks.forEach(task -> startTracking(registry, handle, task)); return new UploadTasksResult(taskOffsets, handle); } - private Map<StateChangeSet, Long> mapOffsets(UploadTask task) { - return task.getChangeSets().stream().collect(Collectors.toMap(identity(), ign -> 0L)); + private Map<StateChangeSet, Tuple2<Long, Long>> mapOffsets(UploadTask task) { + return task.getChangeSets().stream() + .collect(Collectors.toMap(identity(), ign -> Tuple2.of(0L, 0L))); } @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java index 70604262496..9813409475f 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java @@ -128,10 +128,13 @@ abstract class StateChangeLoggerTestBase<Namespace> { public void truncate(SequenceNumber to) {} @Override - public void confirm(SequenceNumber from, SequenceNumber to) {} + public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {} @Override - public void reset(SequenceNumber from, SequenceNumber to) {} + public void subsume(long checkpointId) {} + + @Override + public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) {} @Override public void truncateAndClose(SequenceNumber from) {} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java index 1f770de6066..3b0693cd3ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java @@ -150,11 +150,9 @@ public class ChangelogLocalRecoveryITCase extends TestLogger { () -> miniCluster .getExecutionGraph(firstJobGraph.getJobID()) - .get(10000, TimeUnit.SECONDS), + .get(500, TimeUnit.SECONDS), false); - - waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false); - miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get(); + miniCluster.triggerCheckpoint(firstJobGraph.getJobID()); } private StreamExecutionEnvironment getEnv(
