This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aef75be34d99c737f5c565703a971027ac44f855 Author: fredia <[email protected]> AuthorDate: Tue May 24 15:23:38 2022 +0800 [FLINK-27692][changelog] Support local recovery for materialized part --- .../flink/runtime/minicluster/MiniCluster.java | 3 +- .../state/ChangelogTaskLocalStateStore.java | 209 ++++++++++++++++++++ .../state/TaskExecutorLocalStateStoresManager.java | 54 ++++-- .../runtime/state/TaskLocalStateStoreImpl.java | 146 +++++++------- .../ChangelogStateBackendLocalHandle.java | 116 +++++++++++ .../flink/runtime/taskexecutor/TaskExecutor.java | 4 +- .../state/ChangelogTaskLocalStateStoreTest.java | 214 +++++++++++++++++++++ .../TaskExecutorLocalStateStoresManagerTest.java | 30 ++- .../runtime/state/TaskLocalStateStoreImplTest.java | 18 +- .../changelog/ChangelogKeyedStateBackend.java | 75 +++++--- .../ChangelogLocalRecoveryITCase.java | 189 ++++++++++++++++++ .../ChangelogPeriodicMaterializationTestBase.java | 2 +- 12 files changed, 920 insertions(+), 140 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index e07980aa5c0..dc6d09fe4f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -137,6 +137,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -321,7 +322,7 @@ public class MiniCluster implements AutoCloseableAsync { WorkingDirectory.create( ClusterEntrypointUtils.generateWorkingDirectoryFile( configuration, - Optional.empty(), + Optional.of(PROCESS_WORKING_DIR_BASE), "minicluster_" + ResourceID.generate())); initializeIOFormatClasses(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java new file mode 100644 index 00000000000..a814589d45f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.LongPredicate; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Changelog's implementation of a {@link TaskLocalStateStore}. */ +public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl { + + private static final Logger LOG = LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class); + + private static final String CHANGE_LOG_CHECKPOINT_PREFIX = "changelog_chk_"; + + /** + * The mapper of checkpointId and materializationId. (cp3, materializationId2) means cp3 refer + * to m1. + */ + private final Map<Long, Long> mapToMaterializationId; + + /** Last checkpointId, to check whether checkpoint is out of order. */ + private long lastCheckpointId = -1L; + + public ChangelogTaskLocalStateStore( + @Nonnull JobID jobID, + @Nonnull AllocationID allocationID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull Executor discardExecutor) { + super(jobID, allocationID, jobVertexID, subtaskIndex, localRecoveryConfig, discardExecutor); + this.mapToMaterializationId = new HashMap<>(); + } + + private void updateReference(long checkpointId, TaskStateSnapshot localState) { + if (localState == null) { + localState = NULL_DUMMY; + } + for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry : + localState.getSubtaskStateMappings()) { + for (KeyedStateHandle keyedStateHandle : + subtaskStateEntry.getValue().getManagedKeyedState()) { + if (keyedStateHandle instanceof ChangelogStateBackendHandle) { + ChangelogStateBackendHandle changelogStateBackendHandle = + (ChangelogStateBackendHandle) keyedStateHandle; + long materializationID = changelogStateBackendHandle.getMaterializationID(); + if (mapToMaterializationId.containsKey(checkpointId)) { + checkState( + materializationID == mapToMaterializationId.get(checkpointId), + "one checkpoint contains at most one materializationID"); + } else { + mapToMaterializationId.put(checkpointId, materializationID); + } + } + } + } + } + + @Override + public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot localState) { + if (checkpointId < lastCheckpointId) { + LOG.info( + "Current checkpoint {} is out of order, smaller than last CheckpointId {}.", + lastCheckpointId, + checkpointId); + return; + } else { + lastCheckpointId = checkpointId; + } + synchronized (lock) { + updateReference(checkpointId, localState); + } + super.storeLocalState(checkpointId, localState); + } + + @Override + protected File getCheckpointDirectory(long checkpointId) { + return new File( + getLocalRecoveryDirectoryProvider().subtaskBaseDirectory(checkpointId), + CHANGE_LOG_CHECKPOINT_PREFIX + checkpointId); + } + + private void deleteMaterialization(LongPredicate pruningChecker) { + Set<Long> materializationToRemove; + synchronized (lock) { + Set<Long> checkpoints = + mapToMaterializationId.keySet().stream() + .filter(pruningChecker::test) + .collect(Collectors.toSet()); + materializationToRemove = + checkpoints.stream() + .map(mapToMaterializationId::remove) + .collect(Collectors.toSet()); + materializationToRemove.removeAll(mapToMaterializationId.values()); + } + + discardExecutor.execute( + () -> + syncDiscardDirectoryForCollection( + materializationToRemove.stream() + .map(super::getCheckpointDirectory) + .collect(Collectors.toList()))); + } + + private void syncDiscardDirectoryForCollection(Collection<File> toDiscard) { + for (File directory : toDiscard) { + if (directory.exists()) { + try { + // TODO: This is guaranteed by the wrapped backend only using this folder for + // its local state, the materialized handle should be discarded here too. + deleteDirectory(directory); + } catch (IOException ex) { + LOG.warn( + "Exception while deleting local state directory of {} in subtask ({} - {} - {}).", + directory, + jobID, + jobVertexID, + subtaskIndex, + ex); + } + } + } + } + + @Override + public void pruneCheckpoints(LongPredicate pruningChecker, boolean breakOnceCheckerFalse) { + // Scenarios: + // c1,m1 + // confirm c1, do nothing. + // c2,m1 + // confirm c2, delete c1, don't delete m1 + // c3,m2 + // confirm c3, delete c2, delete m1 + + // delete changelog-chk + super.pruneCheckpoints(pruningChecker, false); + deleteMaterialization(pruningChecker); + } + + @Override + public CompletableFuture<Void> dispose() { + deleteMaterialization(id -> true); + synchronized (lock) { + mapToMaterializationId.clear(); + } + return super.dispose(); + } + + @Override + public String toString() { + return "ChangelogTaskLocalStateStore{" + + "jobID=" + + jobID + + ", jobVertexID=" + + jobVertexID + + ", allocationID=" + + allocationID.toHexString() + + ", subtaskIndex=" + + subtaskIndex + + ", localRecoveryConfig=" + + localRecoveryConfig + + ", storedCheckpointIDs=" + + storedTaskStateByCheckpointID.keySet() + + ", mapToMaterializationId=" + + mapToMaterializationId.entrySet() + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 22c894614ee..b4dcd6af729 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.configuration.StateChangelogOptionsInternal; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.FileUtils; @@ -122,7 +125,9 @@ public class TaskExecutorLocalStateStoresManager { @Nonnull JobID jobId, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, - @Nonnegative int subtaskIndex) { + @Nonnegative int subtaskIndex, + Configuration clusterConfiguration, + Configuration jobConfiguration) { synchronized (lock) { if (closed) { @@ -164,22 +169,37 @@ public class TaskExecutorLocalStateStoresManager { LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); - taskLocalStateStore = - localRecoveryConfig.isLocalRecoveryEnabled() - ? - - // Real store implementation if local recovery is enabled - new TaskLocalStateStoreImpl( - jobId, - allocationID, - jobVertexID, - subtaskIndex, - localRecoveryConfig, - discardExecutor) - : - - // NOP implementation if local recovery is disabled - new NoOpTaskLocalStateStoreImpl(localRecoveryConfig); + boolean changelogEnabled = + jobConfiguration + .getOptional( + StateChangelogOptionsInternal + .ENABLE_CHANGE_LOG_FOR_APPLICATION) + .orElse( + clusterConfiguration.getBoolean( + StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)); + + if (localRecoveryConfig.isLocalRecoveryEnabled() && changelogEnabled) { + taskLocalStateStore = + new ChangelogTaskLocalStateStore( + jobId, + allocationID, + jobVertexID, + subtaskIndex, + localRecoveryConfig, + discardExecutor); + } else if (localRecoveryConfig.isLocalRecoveryEnabled()) { + taskLocalStateStore = + new TaskLocalStateStoreImpl( + jobId, + allocationID, + jobVertexID, + subtaskIndex, + localRecoveryConfig, + discardExecutor); + } else { + // NOP implementation if local recovery is disabled + taskLocalStateStore = new NoOpTaskLocalStateStoreImpl(localRecoveryConfig); + } taskStateManagers.put(taskKey, taskLocalStateStore); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index eef69184a6a..cf6a25545f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -42,7 +43,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -56,6 +56,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.LongPredicate; +import java.util.stream.Collectors; /** Main implementation of a {@link TaskLocalStateStore}. */ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @@ -69,34 +70,34 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { public static final String TASK_STATE_SNAPSHOT_FILENAME = "_task_state_snapshot"; /** JobID from the owning subtask. */ - @Nonnull private final JobID jobID; + @Nonnull protected final JobID jobID; /** AllocationID of the owning slot. */ - @Nonnull private final AllocationID allocationID; + @Nonnull protected final AllocationID allocationID; /** JobVertexID of the owning subtask. */ - @Nonnull private final JobVertexID jobVertexID; + @Nonnull protected final JobVertexID jobVertexID; /** Subtask index of the owning subtask. */ - @Nonnegative private final int subtaskIndex; + @Nonnegative protected final int subtaskIndex; /** The configured mode for local recovery. */ - @Nonnull private final LocalRecoveryConfig localRecoveryConfig; + @Nonnull protected final LocalRecoveryConfig localRecoveryConfig; /** Executor that runs the discarding of released state objects. */ - @Nonnull private final Executor discardExecutor; + @Nonnull protected final Executor discardExecutor; /** Lock for synchronisation on the storage map and the discarded status. */ - @Nonnull private final Object lock = new Object(); + @Nonnull protected final Object lock = new Object(); /** Status flag if this store was already discarded. */ @GuardedBy("lock") - private boolean disposed; + protected boolean disposed; /** Maps checkpoint ids to local TaskStateSnapshots. */ @Nonnull @GuardedBy("lock") - private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID; + protected final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID; public TaskLocalStateStoreImpl( @Nonnull JobID jobID, @@ -141,19 +142,19 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { subtaskIndex); } - Map.Entry<Long, TaskStateSnapshot> toDiscard = null; + Tuple2<Long, TaskStateSnapshot> toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, localState); + toDiscard = Tuple2.of(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); persistLocalStateMetadata(checkpointId, localState); if (previous != null) { - toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, previous); + toDiscard = Tuple2.of(checkpointId, previous); } } } @@ -170,6 +171,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { * @param localState task state snapshot that will be persisted */ private void persistLocalStateMetadata(long checkpointId, TaskStateSnapshot localState) { + createFolderOrFail(getCheckpointDirectory(checkpointId)); final File taskStateSnapshotFile = getTaskStateSnapshotFile(checkpointId); try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(taskStateSnapshotFile))) { @@ -186,20 +188,25 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @VisibleForTesting File getTaskStateSnapshotFile(long checkpointId) { - final File checkpointDirectory = - localRecoveryConfig - .getLocalStateDirectoryProvider() - .orElseThrow( - () -> new IllegalStateException("Local recovery must be enabled.")) - .subtaskSpecificCheckpointDirectory(checkpointId); + return new File(getCheckpointDirectory(checkpointId), TASK_STATE_SNAPSHOT_FILENAME); + } + + protected File getCheckpointDirectory(long checkpointId) { + return getLocalRecoveryDirectoryProvider().subtaskSpecificCheckpointDirectory(checkpointId); + } + private void createFolderOrFail(File checkpointDirectory) { if (!checkpointDirectory.exists() && !checkpointDirectory.mkdirs()) { throw new FlinkRuntimeException( String.format( "Could not create the checkpoint directory '%s'", checkpointDirectory)); } + } - return new File(checkpointDirectory, TASK_STATE_SNAPSHOT_FILENAME); + protected LocalRecoveryDirectoryProvider getLocalRecoveryDirectoryProvider() { + return localRecoveryConfig + .getLocalStateDirectoryProvider() + .orElseThrow(() -> new IllegalStateException("Local recovery must be enabled.")); } @Override @@ -213,24 +220,15 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } if (snapshot != null) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", - checkpointID, - jobID, - jobVertexID, - subtaskIndex, - snapshot); - } else if (LOG.isDebugEnabled()) { - LOG.debug( - "Found registered local state for checkpoint {} in subtask ({} - {} - {})", - checkpointID, - jobID, - jobVertexID, - subtaskIndex); - } + LOG.info( + "Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", + checkpointID, + jobID, + jobVertexID, + subtaskIndex, + snapshot); } else { - LOG.debug( + LOG.info( "Did not find registered local state for checkpoint {} in subtask ({} - {} - {})", checkpointID, jobID, @@ -321,11 +319,14 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @Override public CompletableFuture<Void> dispose() { - Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy; + Collection<Tuple2<Long, TaskStateSnapshot>> statesCopy; synchronized (lock) { disposed = true; - statesCopy = new ArrayList<>(storedTaskStateByCheckpointID.entrySet()); + statesCopy = + storedTaskStateByCheckpointID.entrySet().stream() + .map(entry -> Tuple2.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); storedTaskStateByCheckpointID.clear(); } @@ -335,13 +336,11 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { syncDiscardLocalStateForCollection(statesCopy); // delete the local state subdirectory that belong to this subtask. - LocalRecoveryDirectoryProvider directoryProvider = - localRecoveryConfig - .getLocalStateDirectoryProvider() - .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()); - - for (int i = 0; i < directoryProvider.allocationBaseDirsCount(); ++i) { - File subtaskBaseDirectory = directoryProvider.selectSubtaskBaseDirectory(i); + for (int i = 0; + i < getLocalRecoveryDirectoryProvider().allocationBaseDirsCount(); + ++i) { + File subtaskBaseDirectory = + getLocalRecoveryDirectoryProvider().selectSubtaskBaseDirectory(i); try { deleteDirectory(subtaskBaseDirectory); } catch (IOException e) { @@ -359,16 +358,16 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } private void asyncDiscardLocalStateForCollection( - Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) { + Collection<Tuple2<Long, TaskStateSnapshot>> toDiscard) { if (!toDiscard.isEmpty()) { discardExecutor.execute(() -> syncDiscardLocalStateForCollection(toDiscard)); } } private void syncDiscardLocalStateForCollection( - Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) { - for (Map.Entry<Long, TaskStateSnapshot> entry : toDiscard) { - discardLocalStateForCheckpoint(entry.getKey(), Optional.of(entry.getValue())); + Collection<Tuple2<Long, TaskStateSnapshot>> toDiscard) { + for (Tuple2<Long, TaskStateSnapshot> entry : toDiscard) { + discardLocalStateForCheckpoint(entry.f0, Optional.of(entry.f1)); } } @@ -409,39 +408,31 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } }); - Optional<LocalRecoveryDirectoryProvider> directoryProviderOptional = - localRecoveryConfig.getLocalStateDirectoryProvider(); + File checkpointDir = getCheckpointDirectory(checkpointID); - if (directoryProviderOptional.isPresent()) { - File checkpointDir = - directoryProviderOptional - .get() - .subtaskSpecificCheckpointDirectory(checkpointID); + LOG.debug( + "Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).", + checkpointDir, + checkpointID, + jobID, + jobVertexID, + subtaskIndex); - LOG.debug( - "Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).", - checkpointDir, + try { + deleteDirectory(checkpointDir); + } catch (IOException ex) { + LOG.warn( + "Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).", checkpointID, jobID, jobVertexID, - subtaskIndex); - - try { - deleteDirectory(checkpointDir); - } catch (IOException ex) { - LOG.warn( - "Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).", - checkpointID, - jobID, - jobVertexID, - subtaskIndex, - ex); - } + subtaskIndex, + ex); } } /** Helper method to delete a directory. */ - private void deleteDirectory(File directory) throws IOException { + protected void deleteDirectory(File directory) throws IOException { Path path = new Path(directory.toURI()); FileSystem fileSystem = path.getFileSystem(); if (fileSystem.exists(path)) { @@ -450,9 +441,8 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } /** Pruning the useless checkpoints, it should be called only when holding the {@link #lock}. */ - private void pruneCheckpoints(LongPredicate pruningChecker, boolean breakOnceCheckerFalse) { - - final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); + protected void pruneCheckpoints(LongPredicate pruningChecker, boolean breakOnceCheckerFalse) { + final List<Tuple2<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); synchronized (lock) { Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator = @@ -464,7 +454,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { long entryCheckpointId = snapshotEntry.getKey(); if (pruningChecker.test(entryCheckpointId)) { - toRemove.add(snapshotEntry); + toRemove.add(Tuple2.of(entryCheckpointId, snapshotEntry.getValue())); entryIterator.remove(); } else if (breakOnceCheckerFalse) { break; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java new file mode 100644 index 00000000000..427b4427e93 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateHandleID; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * State handle for local copies of {@link ChangelogStateHandleStreamImpl}. Consists of a + * remoteHandle that maintains the mapping of local handle and remote handle, like + * sharedStateHandleIDs in {@link org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle}. + */ +public class ChangelogStateBackendLocalHandle implements ChangelogStateBackendHandle { + private static final long serialVersionUID = 1L; + private final List<KeyedStateHandle> localMaterialized; + private final List<ChangelogStateHandle> localNonMaterialized; + private final ChangelogStateBackendHandleImpl remoteHandle; + + public ChangelogStateBackendLocalHandle( + List<KeyedStateHandle> localMaterialized, + List<ChangelogStateHandle> localNonMaterialized, + ChangelogStateBackendHandleImpl remoteHandle) { + this.localMaterialized = localMaterialized; + this.localNonMaterialized = localNonMaterialized; + this.remoteHandle = remoteHandle; + } + + @Override + public List<KeyedStateHandle> getMaterializedStateHandles() { + return localMaterialized; + } + + @Override + public List<ChangelogStateHandle> getNonMaterializedStateHandles() { + return localNonMaterialized; + } + + @Override + public long getMaterializationID() { + return remoteHandle.getMaterializationID(); + } + + @Override + public ChangelogStateBackendHandle rebound(long checkpointId) { + throw new UnsupportedOperationException("Should not call here."); + } + + public List<KeyedStateHandle> getRemoteMaterializedStateHandles() { + return remoteHandle.getMaterializedStateHandles(); + } + + public List<ChangelogStateHandle> getRemoteNonMaterializedStateHandles() { + return remoteHandle.getNonMaterializedStateHandles(); + } + + @Override + public long getCheckpointId() { + return remoteHandle.getCheckpointId(); + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { + remoteHandle.registerSharedStates(stateRegistry, checkpointID); + } + + @Override + public long getCheckpointedSize() { + return remoteHandle.getCheckpointedSize(); + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return remoteHandle.getKeyGroupRange(); + } + + @Nullable + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + throw new UnsupportedOperationException( + "This is a local state handle for the TM side only."); + } + + @Override + public StateHandleID getStateHandleId() { + return remoteHandle.getStateHandleId(); + } + + @Override + public void discardState() throws Exception {} + + @Override + public long getStateSize() { + return remoteHandle.getStateSize(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index a7df344756c..3b04d8a7a40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -691,7 +691,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), - tdd.getSubtaskIndex()); + tdd.getSubtaskIndex(), + taskManagerConfiguration.getConfiguration(), + jobInformation.getJobConfiguration()); // TODO: Pass config value from user program and do overriding here. final StateChangelogStorage<?> changelogStorage; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java new file mode 100644 index 00000000000..8493064f806 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** Test for {@link ChangelogTaskLocalStateStore}. */ +public class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest { + + private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider; + + @Before + @Override + public void before() throws Exception { + super.before(); + this.taskLocalStateStore = + createChangelogTaskLocalStateStore( + allocationBaseDirs, jobID, allocationID, jobVertexID, subtaskIdx); + } + + @Nonnull + private ChangelogTaskLocalStateStore createChangelogTaskLocalStateStore( + File[] allocationBaseDirs, + JobID jobID, + AllocationID allocationID, + JobVertexID jobVertexID, + int subtaskIdx) { + LocalRecoveryDirectoryProviderImpl directoryProvider = + new LocalRecoveryDirectoryProviderImpl( + allocationBaseDirs, jobID, jobVertexID, subtaskIdx); + this.localRecoveryDirectoryProvider = directoryProvider; + + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + return new ChangelogTaskLocalStateStore( + jobID, + allocationID, + jobVertexID, + subtaskIdx, + localRecoveryConfig, + Executors.directExecutor()); + } + + @Test + @Override + public void pruneCheckpoints() throws Exception { + TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1); + TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 1); + TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 1); + + taskLocalStateStore.pruneMatchingCheckpoints(id -> id != 2); + assertNull(taskLocalStateStore.retrieveLocalState(3)); + assertTrue(stateSnapshot3.isDiscarded()); + assertNull(taskLocalStateStore.retrieveLocalState(1)); + assertTrue(stateSnapshot1.isDiscarded()); + assertTrue(checkMaterializedDirExists(1)); + assertEquals(stateSnapshot2, taskLocalStateStore.retrieveLocalState(2)); + } + + @Test + @Override + public void confirmCheckpoint() throws Exception { + TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1); + TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 1); + TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 1); + + taskLocalStateStore.confirmCheckpoint(3); + assertNull(taskLocalStateStore.retrieveLocalState(2)); + assertTrue(stateSnapshot2.isDiscarded()); + assertTrue(stateSnapshot1.isDiscarded()); + assertTrue(checkMaterializedDirExists(1)); + assertEquals(stateSnapshot3, taskLocalStateStore.retrieveLocalState(3)); + + TestingTaskStateSnapshot stateSnapshot4 = storeChangelogStates(4, 2); + taskLocalStateStore.confirmCheckpoint(4); + assertNull(taskLocalStateStore.retrieveLocalState(3)); + assertTrue(stateSnapshot3.isDiscarded()); + // delete materialization 1 + assertFalse(checkMaterializedDirExists(1)); + assertEquals(stateSnapshot4, taskLocalStateStore.retrieveLocalState(4)); + } + + @Test + @Override + public void abortCheckpoint() throws Exception { + TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1); + TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 2); + TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 2); + taskLocalStateStore.abortCheckpoint(2); + assertNull(taskLocalStateStore.retrieveLocalState(2)); + assertTrue(stateSnapshot2.isDiscarded()); + // the materialized part of checkpoint 2 retain, because it still used by checkpoint 3 + assertTrue(checkMaterializedDirExists(2)); + assertTrue(checkMaterializedDirExists(1)); + assertEquals(stateSnapshot3, taskLocalStateStore.retrieveLocalState(3)); + + taskLocalStateStore.abortCheckpoint(3); + assertFalse(checkMaterializedDirExists(2)); + } + + @Test + public void retrievePersistedLocalStateFromDisc() { + final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot(); + final long checkpointId = 0L; + taskLocalStateStore.storeLocalState(checkpointId, taskStateSnapshot); + final ChangelogTaskLocalStateStore newTaskLocalStateStore = + createChangelogTaskLocalStateStore( + allocationBaseDirs, jobID, allocationID, jobVertexID, subtaskIdx); + + final TaskStateSnapshot retrievedTaskStateSnapshot = + newTaskLocalStateStore.retrieveLocalState(checkpointId); + + assertThat(retrievedTaskStateSnapshot).isEqualTo(taskStateSnapshot); + } + + @Test + public void deletesLocalStateIfRetrievalFails() throws IOException { + final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot(); + final long checkpointId = 0L; + taskLocalStateStore.storeLocalState(checkpointId, taskStateSnapshot); + + final File taskStateSnapshotFile = + taskLocalStateStore.getTaskStateSnapshotFile(checkpointId); + + Files.write( + taskStateSnapshotFile.toPath(), new byte[] {1, 2, 3, 4}, StandardOpenOption.WRITE); + + final ChangelogTaskLocalStateStore newTaskLocalStateStore = + createChangelogTaskLocalStateStore( + allocationBaseDirs, jobID, allocationID, jobVertexID, subtaskIdx); + + assertThat(newTaskLocalStateStore.retrieveLocalState(checkpointId)).isNull(); + assertThat(taskStateSnapshotFile.getParentFile()).doesNotExist(); + } + + private boolean checkMaterializedDirExists(long materializationID) { + File materializedDir = + localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory( + materializationID); + return materializedDir.exists(); + } + + private void writeToMaterializedDir(long materializationID) { + File materializedDir = + localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory( + materializationID); + if (!materializedDir.exists() && !materializedDir.mkdirs()) { + throw new FlinkRuntimeException( + String.format( + "Could not create the materialized directory '%s'", materializedDir)); + } + } + + private TestingTaskStateSnapshot storeChangelogStates( + long checkpointID, long materializationID) { + writeToMaterializedDir(materializationID); + OperatorID operatorID = new OperatorID(); + TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot(); + OperatorSubtaskState operatorSubtaskState = + OperatorSubtaskState.builder() + .setManagedKeyedState( + new ChangelogStateBackendHandleImpl( + Collections.emptyList(), + Collections.emptyList(), + new KeyGroupRange(0, 3), + checkpointID, + materializationID, + checkpointID)) + .build(); + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + taskLocalStateStore.storeLocalState(checkpointID, taskStateSnapshot); + return taskStateSnapshot; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 5fd6ddff67f..ef10986e28e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -166,7 +166,12 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask( - jobID, allocationID, jobVertexID, subtaskIdx); + jobID, + allocationID, + jobVertexID, + subtaskIdx, + new Configuration(), + new Configuration()); Assert.assertFalse(taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled()); Assert.assertNull( @@ -202,7 +207,12 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask( - jobID, allocationID, jobVertexID, subtaskIdx); + jobID, + allocationID, + jobVertexID, + subtaskIdx, + new Configuration(), + new Configuration()); LocalRecoveryDirectoryProvider directoryProvider = taskLocalStateStore @@ -258,7 +268,12 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { taskLocalStateStore = storesManager.localStateStoreForSubtask( - jobID, otherAllocationID, jobVertexID, subtaskIdx); + jobID, + otherAllocationID, + jobVertexID, + subtaskIdx, + new Configuration(), + new Configuration()); directoryProvider = taskLocalStateStore @@ -338,9 +353,14 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { // register local state stores taskExecutorLocalStateStoresManager.localStateStoreForSubtask( - jobId, retainedAllocationId, jobVertexId, 0); + jobId, + retainedAllocationId, + jobVertexId, + 0, + new Configuration(), + new Configuration()); taskExecutorLocalStateStoresManager.localStateStoreForSubtask( - jobId, otherAllocationId, jobVertexId, 1); + jobId, otherAllocationId, jobVertexId, 1, new Configuration(), new Configuration()); final Collection<Path> allocationDirectories = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 8c906b51fa2..661e39d876e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -51,13 +51,13 @@ import static org.junit.Assert.assertTrue; /** Test for the {@link TaskLocalStateStoreImpl}. */ public class TaskLocalStateStoreImplTest extends TestLogger { - private TemporaryFolder temporaryFolder; - private File[] allocationBaseDirs; - private TaskLocalStateStoreImpl taskLocalStateStore; - private JobID jobID; - private AllocationID allocationID; - private JobVertexID jobVertexID; - private int subtaskIdx; + protected TemporaryFolder temporaryFolder; + protected File[] allocationBaseDirs; + protected TaskLocalStateStoreImpl taskLocalStateStore; + protected JobID jobID; + protected AllocationID allocationID; + protected JobVertexID jobVertexID; + protected int subtaskIdx; @Before public void before() throws Exception { @@ -216,7 +216,7 @@ public class TaskLocalStateStoreImplTest extends TestLogger { } @Nonnull - private TaskStateSnapshot createTaskStateSnapshot() { + protected TaskStateSnapshot createTaskStateSnapshot() { final Map<OperatorID, OperatorSubtaskState> operatorSubtaskStates = new HashMap<>(); operatorSubtaskStates.put(new OperatorID(), OperatorSubtaskState.builder().build()); operatorSubtaskStates.put(new OperatorID(), OperatorSubtaskState.builder().build()); @@ -273,7 +273,7 @@ public class TaskLocalStateStoreImplTest extends TestLogger { return taskStateSnapshots; } - private static final class TestingTaskStateSnapshot extends TaskStateSnapshot { + protected static final class TestingTaskStateSnapshot extends TaskStateSnapshot { private static final long serialVersionUID = 2046321877379917040L; private boolean isDiscarded = false; diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index 1935600fd4e..a89c22f3a58 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestableKeyedStateBackend; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -381,11 +382,12 @@ public class ChangelogKeyedStateBackend<K> changelogTruncateHelper.checkpoint(checkpointId, lastUploadedTo); LOG.info( - "snapshot of {} for checkpoint {}, change range: {}..{}", + "snapshot of {} for checkpoint {}, change range: {}..{}, materialization ID {}", subtaskName, checkpointId, lastUploadedFrom, - lastUploadedTo); + lastUploadedTo, + changelogSnapshotState.getMaterializationID()); ChangelogSnapshotState changelogStateBackendStateCopy = changelogSnapshotState; @@ -430,25 +432,20 @@ public class ChangelogKeyedStateBackend<K> && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) { return SnapshotResult.empty(); } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) { - return SnapshotResult.withLocalState( + ChangelogStateBackendHandleImpl jmHandle = new ChangelogStateBackendHandleImpl( changelogStateBackendStateCopy.getMaterializedSnapshot(), prevDeltaCopy, getKeyGroupRange(), checkpointId, changelogStateBackendStateCopy.materializationID, - persistedSizeOfThisCheckpoint), - new ChangelogStateBackendHandleImpl( + persistedSizeOfThisCheckpoint); + return SnapshotResult.withLocalState( + jmHandle, + new ChangelogStateBackendLocalHandle( changelogStateBackendStateCopy.getLocalMaterializedSnapshot(), - // TODO: Restore ChangelogStateHandles from remote temporarily, because - // ChangelogStateHandles are small(about 10MB). - // In the future, the double-stream option may be implemented according - // to the test results. prevDeltaCopy, - getKeyGroupRange(), - checkpointId, - changelogStateBackendStateCopy.materializationID, - persistedSizeOfThisCheckpoint)); + jmHandle)); } else { return SnapshotResult.of( new ChangelogStateBackendHandleImpl( @@ -630,16 +627,39 @@ public class ChangelogKeyedStateBackend<K> List<KeyedStateHandle> materialized = new ArrayList<>(); List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>(); + List<KeyedStateHandle> localMaterialized = new ArrayList<>(); + List<ChangelogStateHandle> localRestoredNonMaterialized = new ArrayList<>(); + for (ChangelogStateBackendHandle h : stateHandles) { if (h != null) { - materialized.addAll(h.getMaterializedStateHandles()); - restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); + if (h instanceof ChangelogStateBackendLocalHandle) { + ChangelogStateBackendLocalHandle localHandle = + (ChangelogStateBackendLocalHandle) h; + materialized.addAll(localHandle.getRemoteMaterializedStateHandles()); + restoredNonMaterialized.addAll( + localHandle.getRemoteNonMaterializedStateHandles()); + localMaterialized.addAll(localHandle.getMaterializedStateHandles()); + localRestoredNonMaterialized.addAll( + localHandle.getNonMaterializedStateHandles()); + } else { + materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); + } // choose max materializationID to handle rescaling materializationId = Math.max(materializationId, h.getMaterializationID()); } } this.materializedId = materializationId + 1; - // Todo: distinguish whether the handle is local or remote + + if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { + return new ChangelogSnapshotState( + materialized, + localMaterialized, + restoredNonMaterialized, + localRestoredNonMaterialized, + stateChangelogWriter.initialSequenceNumber(), + materializationId); + } return new ChangelogSnapshotState( materialized, restoredNonMaterialized, @@ -872,22 +892,21 @@ public class ChangelogKeyedStateBackend<K> List<ChangelogStateHandle> localRestoredNonMaterialized, SequenceNumber materializedTo, long materializationID) { + ChangelogStateBackendHandleImpl jmHandle = + new ChangelogStateBackendHandleImpl( + materializedSnapshot, + restoredNonMaterialized, + getKeyGroupRange(), + lastCheckpointId, + materializationID, + 0L); this.changelogSnapshot = SnapshotResult.withLocalState( - new ChangelogStateBackendHandleImpl( - materializedSnapshot, - restoredNonMaterialized, - getKeyGroupRange(), - lastCheckpointId, - materializationID, - 0L), - new ChangelogStateBackendHandleImpl( + jmHandle, + new ChangelogStateBackendLocalHandle( localMaterializedSnapshot, localRestoredNonMaterialized, - getKeyGroupRange(), - lastCheckpointId, - materializationID, - 0L)); + jmHandle)); this.materializedTo = materializedTo; this.materializationID = materializationID; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java new file mode 100644 index 00000000000..1f770de6066 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink; +import org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction; +import org.apache.flink.test.util.InfiniteIntegerSource; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY; +import static org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE; +import static org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE; +import static org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; +import static org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.getAllStateHandleId; + +/** + * Local recovery IT case for changelog. It never fails because local recovery is nice but not + * necessary. + */ +@RunWith(Parameterized.class) +public class ChangelogLocalRecoveryITCase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 2; + private static final int NUM_TASK_SLOTS = 1; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Parameterized.Parameter public AbstractStateBackend delegatedStateBackend; + + @Parameterized.Parameters(name = "delegated state backend type = {0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(false), + new EmbeddedRocksDBStateBackend(true)); + } + + private MiniClusterWithClientResource cluster; + private static String workingDir; + + @BeforeClass + public static void setWorkingDir() throws IOException { + workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath(); + } + + @Before + public void setup() throws Exception { + Configuration configuration = new Configuration(); + configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1); + + configuration.setString(PROCESS_WORKING_DIR_BASE, workingDir); + configuration.setString(JOB_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir); + configuration.setString(TASK_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir); + configuration.setBoolean(LOCAL_RECOVERY, true); + FsStateChangelogStorageFactory.configure( + configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMillis(1000), 1); + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + } + + @After + public void teardown() { + cluster.after(); + } + + private JobGraph buildJobGraph(StreamExecutionEnvironment env) { + env.addSource(new InfiniteIntegerSource()) + .setParallelism(1) + .keyBy(element -> element) + .process(new CountFunction()) + .addSink(new CollectionSink()) + .setParallelism(1); + return env.getStreamGraph().getJobGraph(); + } + + @Test + public void testRestartTM() throws Exception { + File checkpointFolder = TEMPORARY_FOLDER.newFolder(); + MiniCluster miniCluster = cluster.getMiniCluster(); + StreamExecutionEnvironment env1 = + getEnv(delegatedStateBackend, checkpointFolder, true, 200, 800); + JobGraph firstJobGraph = buildJobGraph(env1); + + miniCluster.submitJob(firstJobGraph).get(); + waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false); + // wait job for doing materialization. + waitUntilCondition( + () -> !getAllStateHandleId(firstJobGraph.getJobID(), miniCluster).isEmpty()); + miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get(); + CompletableFuture<Void> terminationFuture = miniCluster.terminateTaskManager(1); + terminationFuture.get(); + miniCluster.startTaskManager(); + waitForAllTaskRunning( + () -> + miniCluster + .getExecutionGraph(firstJobGraph.getJobID()) + .get(10000, TimeUnit.SECONDS), + false); + + waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false); + miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get(); + } + + private StreamExecutionEnvironment getEnv( + StateBackend stateBackend, + File checkpointFile, + boolean changelogEnabled, + long checkpointInterval, + long materializationInterval) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(checkpointInterval); + env.getCheckpointConfig().enableUnalignedCheckpoints(false); + env.setStateBackend(stateBackend) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10)); + env.configure(new Configuration().set(LOCAL_RECOVERY, true)); + + env.getCheckpointConfig().setCheckpointStorage(checkpointFile.toURI()); + env.enableChangelogStateBackend(changelogEnabled); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(materializationInterval)) + .set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1)); + env.getCheckpointConfig() + .setExternalizedCheckpointCleanup( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + Configuration configuration = new Configuration(); + configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1); + env.configure(configuration); + return env; + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java index 97fb8cecda9..6220e29bb0d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java @@ -219,7 +219,7 @@ public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogge return JobID.fromByteArray(randomBytes); } - protected static Set<StateHandleID> getAllStateHandleId(JobID jobID, MiniCluster miniCluster) + public static Set<StateHandleID> getAllStateHandleId(JobID jobID, MiniCluster miniCluster) throws IOException, FlinkJobNotFoundException, ExecutionException, InterruptedException { Optional<String> mostRecentCompletedCheckpointPath =
