rmetzger commented on a change in pull request #14394:
URL: https://github.com/apache/flink/pull/14394#discussion_r544037229
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
##########
@@ -164,6 +169,7 @@ public Void restore() throws Exception {
kvStatesById, restoredMetaInfos.size(),
serializationProxy.getReadVersion(),
serializationProxy.isUsingKeyGroupCompression());
+ LOG.info("Finish to restore from state handle:
{}.", keyedStateHandle);
Review comment:
```suggestion
LOG.info("Finished restoring from state handle:
{}.", keyedStateHandle);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
##########
@@ -112,6 +112,7 @@ public HeapKeyedStateBackendBuilder(
keyContext);
try {
restoreOperation.restore();
+ logger.info("Finish to build heap keyed
state-backend.");
Review comment:
```suggestion
logger.info("Finished to build heap keyed
state-backend.");
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -162,11 +162,13 @@ public RocksDBRestoreResult restore()
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
+ logger.info("Start to restore from state handle: {}.",
currentKeyGroupsStateHandle);
currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
restoreKVStateData();
+ logger.info("Finish to restore from state handle: {}.",
currentKeyGroupsStateHandle);
Review comment:
```suggestion
logger.info("Finished restoring from state handle:
{}.", currentKeyGroupsStateHandle);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
##########
@@ -122,6 +126,7 @@ public Void restore() throws Exception {
throw
unexpectedStateHandleException(KeyGroupsStateHandle.class,
keyedStateHandle.getClass());
}
+ LOG.info("Start to restore from state handle: {}.",
keyedStateHandle);
Review comment:
```suggestion
LOG.info("Starting to restore from state handle: {}.",
keyedStateHandle);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -317,21 +314,22 @@ private static void checkAndCreateDirectory(File
directory) throws IOException {
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (Exception ex) {
- LOG.warn("Failed to instance base path for
RocksDB: " + instanceBasePath, ex);
+ logger.warn("Failed to instance base path for
RocksDB: " + instanceBasePath, ex);
Review comment:
```suggestion
logger.warn("Failed to delete base path for
RocksDB: " + instanceBasePath, ex);
```
Not sure, maybe I don't understand what the code is doing here.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -317,21 +314,22 @@ private static void checkAndCreateDirectory(File
directory) throws IOException {
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (Exception ex) {
- LOG.warn("Failed to instance base path for
RocksDB: " + instanceBasePath, ex);
+ logger.warn("Failed to instance base path for
RocksDB: " + instanceBasePath, ex);
}
// Log and rethrow
if (e instanceof BackendBuildingException) {
throw (BackendBuildingException) e;
} else {
String errMsg = "Caught unexpected exception.";
- LOG.error(errMsg, e);
+ logger.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
+ logger.info("Finish to build RocksDB keyed state-backend at
{}.", instanceBasePath);
Review comment:
```suggestion
logger.info("Finished building RocksDB keyed state-backend at
{}.", instanceBasePath);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
##########
@@ -330,6 +330,7 @@ private void
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
}
} // releases native iterator resources
}
+ logger.info("Finish to restore from state
handle: {} with rescaling.", rawStateHandle);
Review comment:
```suggestion
logger.info("Finished restoring from state
handle: {} with rescaling.", rawStateHandle);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -162,11 +162,13 @@ public RocksDBRestoreResult restore()
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
+ logger.info("Start to restore from state handle: {}.",
currentKeyGroupsStateHandle);
Review comment:
```suggestion
logger.info("Starting to restore from state handle:
{}.", currentKeyGroupsStateHandle);
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
##########
@@ -295,6 +294,7 @@ private void
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
throw
unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class,
rawStateHandle.getClass());
}
+ logger.info("Start to restore from state handle: {}
with rescaling.", rawStateHandle);
Review comment:
```suggestion
logger.info("Starting to restore from state handle: {}
with rescaling.", rawStateHandle);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]