This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7a078a632cbbba5f0885e5eed87376849a161050 Author: Stefan Richter <[email protected]> AuthorDate: Tue Feb 26 15:45:08 2019 +0100 [FLINK-11743] Fix problem with restoring incremental checkpoints from local state This corrects a problem that was introduced with the refactorings in FLINK-10043. This closes #7841. --- .../runtime/state/BackendBuildingException.java | 4 + .../RocksDBIncrementalRestoreOperation.java | 218 +++++++++++---------- .../RocksDBIncrementalRestorePrepareResult.java | 46 ----- 3 files changed, 116 insertions(+), 152 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java index 109b3ab..73c81f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java @@ -27,4 +27,8 @@ public class BackendBuildingException extends IOException { public BackendBuildingException(String message, Throwable cause) { super(message, cause); } + + public BackendBuildingException(String message) { + super(message); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 87365a2..b085d51 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -157,42 +157,97 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor * Recovery from a single remote incremental state without rescaling. */ private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { - // 1. Prepare for restore without rescaling - Path temporaryRestoreInstancePath = new Path( - instanceBasePath.getAbsolutePath(), - UUID.randomUUID().toString()); // used as restore source for IncrementalKeyedStateHandle - RocksDBIncrementalRestorePrepareResult prepareResult = prepareFiles(rawStateHandle, temporaryRestoreInstancePath); - Path restoreSourcePath = prepareResult.getLocalKeyedStateHandle().getDirectoryStateHandle().getDirectory(); if (rawStateHandle instanceof IncrementalKeyedStateHandle) { - backendUID = ((IncrementalKeyedStateHandle) rawStateHandle).getBackendIdentifier(); + IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; + restorePreviousIncrementalFilesStatus(incrementalKeyedStateHandle); + restoreFromRemoteState(incrementalKeyedStateHandle); + } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { + IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = + (IncrementalLocalKeyedStateHandle) rawStateHandle; + restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle); + restoreFromLocalState(incrementalLocalKeyedStateHandle); } else { - backendUID = ((IncrementalLocalKeyedStateHandle) rawStateHandle).getBackendIdentifier(); + throw new BackendBuildingException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); } + } + + private void restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHandle localKeyedStateHandle) { + backendUID = localKeyedStateHandle.getBackendIdentifier(); + restoredSstFiles.put( + localKeyedStateHandle.getCheckpointId(), + localKeyedStateHandle.getSharedStateHandleIDs()); + lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId(); + } + + private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle remoteKeyedStateHandle) { + backendUID = remoteKeyedStateHandle.getBackendIdentifier(); + restoredSstFiles.put( + remoteKeyedStateHandle.getCheckpointId(), + remoteKeyedStateHandle.getSharedState().keySet()); + lastCompletedCheckpointId = remoteKeyedStateHandle.getCheckpointId(); + } + + private void restoreFromRemoteState(IncrementalKeyedStateHandle stateHandle) throws Exception { + final Path tmpRestoreInstancePath = new Path( + instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); // used as restore source for IncrementalKeyedStateHandle + try { + restoreFromLocalState( + transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle)); + } finally { + cleanUpPathQuietly(tmpRestoreInstancePath); + } + } + + private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState()); + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); + columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true); + columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1); + + Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory(); + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", - this.operatorIdentifier, this.backendUID); + operatorIdentifier, backendUID); + if (!instanceRocksDBPath.mkdirs()) { String errMsg = "Could not create RocksDB data directory: " + instanceBasePath.getAbsolutePath(); LOG.error(errMsg); throw new IOException(errMsg); } - try { - restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath); - } finally { - cleanUpPathQuietly(restoreSourcePath); - } - // 2. Open db instance + + restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath); + openDB(); - // 3. Use the restore sst files as the base for succeeding checkpoints - IncrementalLocalKeyedStateHandle restoreStateHandle = prepareResult.getLocalKeyedStateHandle(); - restoredSstFiles.put( + + registerColumnFamilyHandles(stateMetaInfoSnapshots); + } + + private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory( + Path temporaryRestoreInstancePath, + IncrementalKeyedStateHandle restoreStateHandle) throws Exception { + + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) { + rocksDBStateDownloader.transferAllStateDataToDirectory( + restoreStateHandle, + temporaryRestoreInstancePath, + cancelStreamRegistry); + } + + // since we transferred all remote state to a local directory, we can use the same code as for + // local recovery. + return new IncrementalLocalKeyedStateHandle( + restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), - restoreStateHandle.getSharedStateHandleIDs()); - lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); - // 4. Register column family handles - registerCFHandles(prepareResult.getStateMetaInfoSnapshots()); + new DirectoryStateHandle(temporaryRestoreInstancePath), + restoreStateHandle.getKeyGroupRange(), + restoreStateHandle.getMetaStateHandle(), + restoreStateHandle.getSharedState().keySet()); } - private void cleanUpPathQuietly(Path path) { + private void cleanUpPathQuietly(@Nonnull Path path) { try { FileSystem fileSystem = path.getFileSystem(); if (fileSystem.exists(path)) { @@ -203,7 +258,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } } - private void registerCFHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) + private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) throws BackendBuildingException { // Register CF handlers for (int i = 0; i < metaInfoSnapshots.size(); ++i) { @@ -219,65 +274,26 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } } - private RocksDBIncrementalRestorePrepareResult prepareFiles( - KeyedStateHandle rawStateHandle, - Path restoreInstancePath) throws BackendBuildingException { - final IncrementalLocalKeyedStateHandle localKeyedStateHandle; - try { - if (rawStateHandle instanceof IncrementalKeyedStateHandle) { - IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; - - // read state data. - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(numberOfTransferringThreads)) { - rocksDBStateDownloader.transferAllStateDataToDirectory( - restoreStateHandle, - restoreInstancePath, - cancelStreamRegistry); - } - - // since we transferred all remote state to a local directory, we can use the same code as for - // local recovery. - localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( - restoreStateHandle.getBackendIdentifier(), - restoreStateHandle.getCheckpointId(), - new DirectoryStateHandle(restoreInstancePath), - restoreStateHandle.getKeyGroupRange(), - restoreStateHandle.getMetaStateHandle(), - restoreStateHandle.getSharedState().keySet()); - } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { - // Recovery from local incremental state. - localKeyedStateHandle = (IncrementalLocalKeyedStateHandle) rawStateHandle; - } else { - throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + - ", but found " + rawStateHandle.getClass()); - } - - KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState()); - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); - columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true); - columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1); - return new RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, localKeyedStateHandle); - } catch (Exception e) { - // clean up - cleanUpPathQuietly(restoreInstancePath); - // log and rethrow - String errMsg = "Failed to prepare files for restore from incremental state handle."; - LOG.error(errMsg, e); - throw new BackendBuildingException(errMsg, e); - } - } - /** * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the * real restore instance and then the temporary instance is discarded. */ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - // Init DB instance - initDBWithRescaling(restoreStateHandles); - // Do restore + + // Prepare for restore with rescaling + KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( + restoreStateHandles, keyGroupRange); + + // Init base DB instance + if (initialHandle != null) { + restoreStateHandles.remove(initialHandle); + initDBWithRescaling(initialHandle); + } else { + openDB(); + } + + // Transfer remaining key-groups from temporary instance into base DB byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); @@ -333,35 +349,25 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } } - private void initDBWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws IOException { - // Prepare for restore with rescaling - IncrementalKeyedStateHandle initialHandle = - (IncrementalKeyedStateHandle) RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( - restoreStateHandles, keyGroupRange); - Path instancePath = new Path(dbPath); - if (initialHandle != null) { - restoreStateHandles.remove(initialHandle); - // 1. Download target files - RocksDBIncrementalRestorePrepareResult prepareResult = prepareFiles(initialHandle, instancePath); - // 2. Open db instance - openDB(); - // 3. Clip the DB instance - try { - RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( - db, - columnFamilyHandles, - keyGroupRange, - initialHandle.getKeyGroupRange(), - keyGroupPrefixBytes); - } catch (RocksDBException e) { - String errMsg = "Failed to clip DB after initialization."; - LOG.error(errMsg, e); - throw new BackendBuildingException(errMsg, e); - } - // 4. Register column family handles - registerCFHandles(prepareResult.getStateMetaInfoSnapshots()); - } else { - openDB(); + private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception { + + assert (initialHandle instanceof IncrementalKeyedStateHandle); + + // 1. Restore base DB from selected initial handle + restoreFromRemoteState((IncrementalKeyedStateHandle) initialHandle); + + // 2. Clip the base DB instance + try { + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + db, + columnFamilyHandles, + keyGroupRange, + initialHandle.getKeyGroupRange(), + keyGroupPrefixBytes); + } catch (RocksDBException e) { + String errMsg = "Failed to clip DB after initialization."; + LOG.error(errMsg, e); + throw new BackendBuildingException(errMsg, e); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java deleted file mode 100644 index 414ccfa..0000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state.restore; - -import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; -import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; - -import java.util.List; - -/** - * Entity holding restore preparation result for RocksDB incremental restore. - */ -public class RocksDBIncrementalRestorePrepareResult { - private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - private final IncrementalLocalKeyedStateHandle localKeyedStateHandle; - - public RocksDBIncrementalRestorePrepareResult(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, - IncrementalLocalKeyedStateHandle localKeyedStateHandle) { - this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - this.localKeyedStateHandle = localKeyedStateHandle; - } - - public List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots() { - return stateMetaInfoSnapshots; - } - - public IncrementalLocalKeyedStateHandle getLocalKeyedStateHandle() { - return localKeyedStateHandle; - } -}
