[ 
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497725#comment-16497725
 ] 

ASF GitHub Bot commented on FLINK-8790:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5582#discussion_r192330418
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -795,20 +806,241 @@ private void restoreInstance(
                }
     
                /**
    -            * Recovery from local incremental state.
    +            * Recovery from multi incremental states.
    +            * In case of rescaling, this method creates a temporary 
RocksDB instance for a key-groups shard. All contents
    +            * from the temporary instance are copied into the real restore 
instance and then the temporary instance is
    +            * discarded.
                 */
    -           private void restoreInstance(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) throws Exception {
    +           void restoreFromMultiHandles(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception {
    +
    +                   KeyGroupRange targetKeyGroupRange = 
stateBackend.keyGroupRange;
    +
    +                   chooseTheBestStateHandleToInit(restoreStateHandles, 
targetKeyGroupRange);
    +
    +                   int targetStartKeyGroup = 
stateBackend.getKeyGroupRange().getStartKeyGroup();
    +                   byte[] targetStartKeyGroupPrefixBytes = new 
byte[stateBackend.keyGroupPrefixBytes];
    +                   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; 
++j) {
    +                           targetStartKeyGroupPrefixBytes[j] = (byte) 
(targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * 
Byte.SIZE));
    +                   }
    +
    +                   for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
    +
    +                           if (!(rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
    +                                   throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                           "expected " + 
IncrementalKeyedStateHandle.class +
    +                                           ", but found " + 
rawStateHandle.getClass());
    +                           }
    +
    +                           Path temporaryRestoreInstancePath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath() + 
UUID.randomUUID().toString());
    +                           try (RestoredDBInstance tmpRestoreDBInfo = 
restoreDBFromStateHandle(
    +                                           (IncrementalKeyedStateHandle) 
rawStateHandle,
    +                                           temporaryRestoreInstancePath,
    +                                           targetKeyGroupRange,
    +                                           
stateBackend.keyGroupPrefixBytes,
    +                                           false);
    +                                   RocksDBWriteBatchWrapper 
writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
    +
    +                                   List<ColumnFamilyDescriptor> 
tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
    +                                   List<ColumnFamilyHandle> 
tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
    +
    +                                   int startKeyGroup = 
stateBackend.getKeyGroupRange().getStartKeyGroup();
    +                                   byte[] startKeyGroupPrefixBytes = new 
byte[stateBackend.keyGroupPrefixBytes];
    +                                   for (int j = 0; j < 
stateBackend.keyGroupPrefixBytes; ++j) {
    +                                           startKeyGroupPrefixBytes[j] = 
(byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * 
Byte.SIZE));
    +                                   }
    +
    +                                   // iterating only the requested 
descriptors automatically skips the default column family handle
    +                                   for (int i = 0; i < 
tmpColumnFamilyDescriptors.size(); ++i) {
    +                                           ColumnFamilyHandle 
tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
    +                                           ColumnFamilyDescriptor 
tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i);
    +
    +                                           ColumnFamilyHandle 
targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle(
    +                                                   
tmpColumnFamilyDescriptor, null, 
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
    +
    +                                           try (RocksIteratorWrapper 
iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
    +
    +                                                   
iterator.seek(startKeyGroupPrefixBytes);
    +
    +                                                   while 
(iterator.isValid()) {
    +
    +                                                           int keyGroup = 
0;
    +                                                           for (int j = 0; 
j < stateBackend.keyGroupPrefixBytes; ++j) {
    +                                                                   
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
    +                                                           }
    +
    +                                                           if 
(stateBackend.keyGroupRange.contains(keyGroup)) {
    +                                                                   
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), 
iterator.value());
    +                                                           } else {
    +                                                                   // 
Since the iterator will visit the record according to the sorted order,
    +                                                                   // we 
can just break here.
    +                                                                   break;
    +                                                           }
    +
    +                                                           iterator.next();
    +                                                   }
    +                                           } // releases native iterator 
resources
    +                                   }
    +                           } finally {
    +                                   FileSystem restoreFileSystem = 
temporaryRestoreInstancePath.getFileSystem();
    +                                   if 
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
    +                                           
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
    +                                   }
    +                           }
    +                   }
    +           }
    +
    +           private class RestoredDBInstance implements AutoCloseable {
    +
    +                   @Nonnull
    +                   private final RocksDB db;
    +
    +                   @Nonnull
    +                   private final ColumnFamilyHandle 
defaultColumnFamilyHandle;
    +
    +                   @Nonnull
    +                   private final List<ColumnFamilyHandle> 
columnFamilyHandles;
    +
    +                   @Nonnull
    +                   private final List<ColumnFamilyDescriptor> 
columnFamilyDescriptors;
    +
    +                   @Nonnull
    +                   private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
    +
    +                   public RestoredDBInstance(@Nonnull RocksDB db,
    +                                                           @Nonnull 
List<ColumnFamilyHandle> columnFamilyHandles,
    +                                                           @Nonnull 
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +                                                           @Nonnull 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots) {
    +                           this.db = db;
    +                           this.columnFamilyHandles = columnFamilyHandles;
    +                           this.defaultColumnFamilyHandle = 
this.columnFamilyHandles.remove(0);
    +                           this.columnFamilyDescriptors = 
columnFamilyDescriptors;
    +                           this.stateMetaInfoSnapshots = 
stateMetaInfoSnapshots;
    +                   }
    +
    +                   @Override
    +                   public void close() throws Exception {
    +
    +                           IOUtils.closeQuietly(defaultColumnFamilyHandle);
    +
    +                           for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
    +                                   
IOUtils.closeQuietly(columnFamilyHandle);
    +                           }
    +
    +                           IOUtils.closeQuietly(db);
    +                   }
    +           }
    +
    +           private RestoredDBInstance restoreDBFromStateHandle(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   Path temporaryRestoreInstancePath,
    +                   KeyGroupRange targetKeyGroupRange,
    +                   int keyGroupPrefixBytes,
    +                   boolean needClip) throws Exception {
    +
    +                   transferAllStateDataToDirectory(restoreStateHandle, 
temporaryRestoreInstancePath);
    +
                        // read meta data
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots =
    -                           
readMetaData(localKeyedStateHandle.getMetaDataState());
    +                           
readMetaData(restoreStateHandle.getMetaStateHandle());
     
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                                
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
     
    -                   restoreLocalStateIntoFullInstance(
    -                           localKeyedStateHandle,
    +                   List<ColumnFamilyHandle> columnFamilyHandles =
    +                           new ArrayList<>(stateMetaInfoSnapshots.size() + 
1);
    +
    +                   RocksDB restoreDb = stateBackend.openDB(
    +                           temporaryRestoreInstancePath.getPath(),
                                columnFamilyDescriptors,
    -                           stateMetaInfoSnapshots);
    +                           columnFamilyHandles);
    +
    +                   if (needClip) {
    +                           
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
    +                                   restoreDb,
    +                                   columnFamilyHandles,
    +                                   targetKeyGroupRange,
    +                                   restoreStateHandle.getKeyGroupRange(),
    +                                   keyGroupPrefixBytes);
    +                   }
    +
    +                   return new RestoredDBInstance(restoreDb, 
columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
    +           }
    +
    +           private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
    +                   ColumnFamilyDescriptor columnFamilyDescriptor,
    +                   ColumnFamilyHandle columnFamilyHandle,
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot) throws RocksDBException {
    +
    +                   Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
    +                           
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
    +
    +                   if (null == registeredStateMetaInfoEntry) {
    +                           RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
    +                                   new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                           
stateMetaInfoSnapshot.getStateType(),
    +                                           stateMetaInfoSnapshot.getName(),
    +                                           
stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                           
stateMetaInfoSnapshot.getStateSerializer());
    +
    +                           registeredStateMetaInfoEntry =
    +                                   new Tuple2<>(
    +                                           columnFamilyHandle != null ? 
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +                                           stateMetaInfo);
    +
    +                           stateBackend.kvStateInformation.put(
    +                                   stateMetaInfoSnapshot.getName(),
    +                                   registeredStateMetaInfoEntry);
    +                   }
    +
    +                   return registeredStateMetaInfoEntry.f0;
    +           }
    +
    +           private void chooseTheBestStateHandleToInit(
    --- End diff --
    
    I think the name of this method is no longer accurate: it does not only 
chose the best handle, it already restores as db instance. Maybe we can we 
still break this up into two methods, so that each method only does one thing. 
I think it is not so nice if that creating the db is a side effect of a method 
that claims to only find something.


> Improve performance for recovery from incremental checkpoint
> ------------------------------------------------------------
>
>                 Key: FLINK-8790
>                 URL: https://issues.apache.org/jira/browse/FLINK-8790
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>             Fix For: 1.6.0
>
>
> When there are multi state handle to be restored, we can improve the 
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according 
> to the target key group range (via rocksdb.deleteRange()), this can help use 
> get rid of the `key group check` in 
>  `data insertion loop` and also help us get rid of traversing the useless 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to