This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a078a632cbbba5f0885e5eed87376849a161050
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Feb 26 15:45:08 2019 +0100

    [FLINK-11743] Fix problem with restoring incremental checkpoints from local 
state
    
    This corrects a problem that was introduced with the refactorings in 
FLINK-10043.
    
    This closes #7841.
---
 .../runtime/state/BackendBuildingException.java    |   4 +
 .../RocksDBIncrementalRestoreOperation.java        | 218 +++++++++++----------
 .../RocksDBIncrementalRestorePrepareResult.java    |  46 -----
 3 files changed, 116 insertions(+), 152 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java
index 109b3ab..73c81f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendBuildingException.java
@@ -27,4 +27,8 @@ public class BackendBuildingException extends IOException {
        public BackendBuildingException(String message, Throwable cause) {
                super(message, cause);
        }
+
+       public BackendBuildingException(String message) {
+               super(message);
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 87365a2..b085d51 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -157,42 +157,97 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
         * Recovery from a single remote incremental state without rescaling.
         */
        private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) 
throws Exception {
-               // 1. Prepare for restore without rescaling
-               Path temporaryRestoreInstancePath = new Path(
-                       instanceBasePath.getAbsolutePath(),
-                       UUID.randomUUID().toString()); // used as restore 
source for IncrementalKeyedStateHandle
-               RocksDBIncrementalRestorePrepareResult prepareResult = 
prepareFiles(rawStateHandle, temporaryRestoreInstancePath);
-               Path restoreSourcePath = 
prepareResult.getLocalKeyedStateHandle().getDirectoryStateHandle().getDirectory();
                if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
-                       backendUID = ((IncrementalKeyedStateHandle) 
rawStateHandle).getBackendIdentifier();
+                       IncrementalKeyedStateHandle incrementalKeyedStateHandle 
= (IncrementalKeyedStateHandle) rawStateHandle;
+                       
restorePreviousIncrementalFilesStatus(incrementalKeyedStateHandle);
+                       restoreFromRemoteState(incrementalKeyedStateHandle);
+               } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+                       IncrementalLocalKeyedStateHandle 
incrementalLocalKeyedStateHandle =
+                               (IncrementalLocalKeyedStateHandle) 
rawStateHandle;
+                       
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
+                       restoreFromLocalState(incrementalLocalKeyedStateHandle);
                } else {
-                       backendUID = ((IncrementalLocalKeyedStateHandle) 
rawStateHandle).getBackendIdentifier();
+                       throw new BackendBuildingException("Unexpected state 
handle type, " +
+                               "expected " + IncrementalKeyedStateHandle.class 
+ " or " + IncrementalLocalKeyedStateHandle.class +
+                               ", but found " + rawStateHandle.getClass());
                }
+       }
+
+       private void 
restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) {
+               backendUID = localKeyedStateHandle.getBackendIdentifier();
+               restoredSstFiles.put(
+                       localKeyedStateHandle.getCheckpointId(),
+                       localKeyedStateHandle.getSharedStateHandleIDs());
+               lastCompletedCheckpointId = 
localKeyedStateHandle.getCheckpointId();
+       }
+
+       private void 
restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle 
remoteKeyedStateHandle) {
+               backendUID = remoteKeyedStateHandle.getBackendIdentifier();
+               restoredSstFiles.put(
+                       remoteKeyedStateHandle.getCheckpointId(),
+                       remoteKeyedStateHandle.getSharedState().keySet());
+               lastCompletedCheckpointId = 
remoteKeyedStateHandle.getCheckpointId();
+       }
+
+       private void restoreFromRemoteState(IncrementalKeyedStateHandle 
stateHandle) throws Exception {
+               final Path tmpRestoreInstancePath = new Path(
+                       instanceBasePath.getAbsolutePath(),
+                       UUID.randomUUID().toString()); // used as restore 
source for IncrementalKeyedStateHandle
+               try {
+                       restoreFromLocalState(
+                               
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
+               } finally {
+                       cleanUpPathQuietly(tmpRestoreInstancePath);
+               }
+       }
+
+       private void restoreFromLocalState(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) throws Exception {
+               KeyedBackendSerializationProxy<K> serializationProxy = 
readMetaData(localKeyedStateHandle.getMetaDataState());
+               List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = 
serializationProxy.getStateMetaInfoSnapshots();
+               columnFamilyDescriptors = 
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
+               columnFamilyHandles = new 
ArrayList<>(columnFamilyDescriptors.size() + 1);
+
+               Path restoreSourcePath = 
localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
+
                LOG.debug("Restoring keyed backend uid in operator {} from 
incremental snapshot to {}.",
-                       this.operatorIdentifier, this.backendUID);
+                       operatorIdentifier, backendUID);
+
                if (!instanceRocksDBPath.mkdirs()) {
                        String errMsg = "Could not create RocksDB data 
directory: " + instanceBasePath.getAbsolutePath();
                        LOG.error(errMsg);
                        throw new IOException(errMsg);
                }
-               try {
-                       restoreInstanceDirectoryFromPath(restoreSourcePath, 
dbPath);
-               } finally {
-                       cleanUpPathQuietly(restoreSourcePath);
-               }
-               // 2. Open db instance
+
+               restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
+
                openDB();
-               // 3. Use the restore sst files as the base for succeeding 
checkpoints
-               IncrementalLocalKeyedStateHandle restoreStateHandle = 
prepareResult.getLocalKeyedStateHandle();
-               restoredSstFiles.put(
+
+               registerColumnFamilyHandles(stateMetaInfoSnapshots);
+       }
+
+       private IncrementalLocalKeyedStateHandle 
transferRemoteStateToLocalDirectory(
+               Path temporaryRestoreInstancePath,
+               IncrementalKeyedStateHandle restoreStateHandle) throws 
Exception {
+
+               try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(numberOfTransferringThreads)) {
+                       rocksDBStateDownloader.transferAllStateDataToDirectory(
+                               restoreStateHandle,
+                               temporaryRestoreInstancePath,
+                               cancelStreamRegistry);
+               }
+
+               // since we transferred all remote state to a local directory, 
we can use the same code as for
+               // local recovery.
+               return new IncrementalLocalKeyedStateHandle(
+                       restoreStateHandle.getBackendIdentifier(),
                        restoreStateHandle.getCheckpointId(),
-                       restoreStateHandle.getSharedStateHandleIDs());
-               lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
-               // 4. Register column family handles
-               registerCFHandles(prepareResult.getStateMetaInfoSnapshots());
+                       new DirectoryStateHandle(temporaryRestoreInstancePath),
+                       restoreStateHandle.getKeyGroupRange(),
+                       restoreStateHandle.getMetaStateHandle(),
+                       restoreStateHandle.getSharedState().keySet());
        }
 
-       private void cleanUpPathQuietly(Path path) {
+       private void cleanUpPathQuietly(@Nonnull Path path) {
                try {
                        FileSystem fileSystem = path.getFileSystem();
                        if (fileSystem.exists(path)) {
@@ -203,7 +258,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                }
        }
 
-       private void registerCFHandles(List<StateMetaInfoSnapshot> 
metaInfoSnapshots)
+       private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> 
metaInfoSnapshots)
                throws BackendBuildingException {
                // Register CF handlers
                for (int i = 0; i < metaInfoSnapshots.size(); ++i) {
@@ -219,65 +274,26 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                }
        }
 
-       private RocksDBIncrementalRestorePrepareResult prepareFiles(
-               KeyedStateHandle rawStateHandle,
-               Path restoreInstancePath) throws BackendBuildingException {
-               final IncrementalLocalKeyedStateHandle localKeyedStateHandle;
-               try {
-                       if (rawStateHandle instanceof 
IncrementalKeyedStateHandle) {
-                               IncrementalKeyedStateHandle restoreStateHandle 
= (IncrementalKeyedStateHandle) rawStateHandle;
-
-                               // read state data.
-                               try (RocksDBStateDownloader 
rocksDBStateDownloader =
-                                               new 
RocksDBStateDownloader(numberOfTransferringThreads)) {
-                                       
rocksDBStateDownloader.transferAllStateDataToDirectory(
-                                               restoreStateHandle,
-                                               restoreInstancePath,
-                                               cancelStreamRegistry);
-                               }
-
-                               // since we transferred all remote state to a 
local directory, we can use the same code as for
-                               // local recovery.
-                               localKeyedStateHandle = new 
IncrementalLocalKeyedStateHandle(
-                                       
restoreStateHandle.getBackendIdentifier(),
-                                       restoreStateHandle.getCheckpointId(),
-                                       new 
DirectoryStateHandle(restoreInstancePath),
-                                       restoreStateHandle.getKeyGroupRange(),
-                                       restoreStateHandle.getMetaStateHandle(),
-                                       
restoreStateHandle.getSharedState().keySet());
-                       } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-                               // Recovery from local incremental state.
-                               localKeyedStateHandle = 
(IncrementalLocalKeyedStateHandle) rawStateHandle;
-                       } else {
-                               throw new IllegalStateException("Unexpected 
state handle type, " +
-                                       "expected " + 
IncrementalKeyedStateHandle.class + " or " + 
IncrementalLocalKeyedStateHandle.class +
-                                       ", but found " + 
rawStateHandle.getClass());
-                       }
-
-                       KeyedBackendSerializationProxy<K> serializationProxy = 
readMetaData(localKeyedStateHandle.getMetaDataState());
-                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = 
serializationProxy.getStateMetaInfoSnapshots();
-                       columnFamilyDescriptors = 
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
-                       columnFamilyHandles = new 
ArrayList<>(columnFamilyDescriptors.size() + 1);
-                       return new 
RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, 
localKeyedStateHandle);
-               } catch (Exception e) {
-                       // clean up
-                       cleanUpPathQuietly(restoreInstancePath);
-                       // log and rethrow
-                       String errMsg = "Failed to prepare files for restore 
from incremental state handle.";
-                       LOG.error(errMsg, e);
-                       throw new BackendBuildingException(errMsg, e);
-               }
-       }
-
        /**
         * Recovery from multi incremental states with rescaling. For 
rescaling, this method creates a temporary
         * RocksDB instance for a key-groups shard. All contents from the 
temporary instance are copied into the
         * real restore instance and then the temporary instance is discarded.
         */
        private void restoreWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception {
-               // Init DB instance
-               initDBWithRescaling(restoreStateHandles);
-               // Do restore
+
+               // Prepare for restore with rescaling
+               KeyedStateHandle initialHandle = 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                       restoreStateHandles, keyGroupRange);
+
+               // Init base DB instance
+               if (initialHandle != null) {
+                       restoreStateHandles.remove(initialHandle);
+                       initDBWithRescaling(initialHandle);
+               } else {
+                       openDB();
+               }
+
+               // Transfer remaining key-groups from temporary instance into 
base DB
                byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
                
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(),
 startKeyGroupPrefixBytes);
 
@@ -333,35 +349,25 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                }
        }
 
-       private void initDBWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles) throws IOException {
-               // Prepare for restore with rescaling
-               IncrementalKeyedStateHandle initialHandle =
-                       (IncrementalKeyedStateHandle) 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                               restoreStateHandles, keyGroupRange);
-               Path instancePath = new Path(dbPath);
-               if (initialHandle != null) {
-                       restoreStateHandles.remove(initialHandle);
-                       // 1. Download target files
-                       RocksDBIncrementalRestorePrepareResult prepareResult = 
prepareFiles(initialHandle, instancePath);
-                       // 2. Open db instance
-                       openDB();
-                       // 3. Clip the DB instance
-                       try {
-                               
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
-                                       db,
-                                       columnFamilyHandles,
-                                       keyGroupRange,
-                                       initialHandle.getKeyGroupRange(),
-                                       keyGroupPrefixBytes);
-                       } catch (RocksDBException e) {
-                               String errMsg = "Failed to clip DB after 
initialization.";
-                               LOG.error(errMsg, e);
-                               throw new BackendBuildingException(errMsg, e);
-                       }
-                       // 4. Register column family handles
-                       
registerCFHandles(prepareResult.getStateMetaInfoSnapshots());
-               } else {
-                       openDB();
+       private void initDBWithRescaling(KeyedStateHandle initialHandle) throws 
Exception {
+
+               assert (initialHandle instanceof IncrementalKeyedStateHandle);
+
+               // 1. Restore base DB from selected initial handle
+               restoreFromRemoteState((IncrementalKeyedStateHandle) 
initialHandle);
+
+               // 2. Clip the base DB instance
+               try {
+                       
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
+                               db,
+                               columnFamilyHandles,
+                               keyGroupRange,
+                               initialHandle.getKeyGroupRange(),
+                               keyGroupPrefixBytes);
+               } catch (RocksDBException e) {
+                       String errMsg = "Failed to clip DB after 
initialization.";
+                       LOG.error(errMsg, e);
+                       throw new BackendBuildingException(errMsg, e);
                }
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java
deleted file mode 100644
index 414ccfa..0000000
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestorePrepareResult.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.restore;
-
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-
-import java.util.List;
-
-/**
- * Entity holding restore preparation result for RocksDB incremental restore.
- */
-public class RocksDBIncrementalRestorePrepareResult {
-       private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-       private final IncrementalLocalKeyedStateHandle localKeyedStateHandle;
-
-       public 
RocksDBIncrementalRestorePrepareResult(List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots,
-                                                                               
                IncrementalLocalKeyedStateHandle localKeyedStateHandle) {
-               this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-               this.localKeyedStateHandle = localKeyedStateHandle;
-       }
-
-       public List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots() {
-               return stateMetaInfoSnapshots;
-       }
-
-       public IncrementalLocalKeyedStateHandle getLocalKeyedStateHandle() {
-               return localKeyedStateHandle;
-       }
-}

Reply via email to