This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 6addd1e  [FLINK-19013][state-backends] Add start/end logs for state 
restoration
6addd1e is described below

commit 6addd1ea75f002e3a770d9e0da727edb039f8fd1
Author: Yun Tang <[email protected]>
AuthorDate: Wed Dec 16 01:30:23 2020 +0800

    [FLINK-19013][state-backends] Add start/end logs for state restoration
---
 .../runtime/state/AbstractKeyedStateBackendBuilder.java   |  5 +++++
 .../runtime/state/heap/HeapKeyedStateBackendBuilder.java  |  1 +
 .../flink/runtime/state/heap/HeapRestoreOperation.java    |  6 ++++++
 .../streaming/state/RocksDBKeyedStateBackendBuilder.java  |  8 +++-----
 .../state/restore/AbstractRocksDBRestoreOperation.java    |  4 ++++
 .../state/restore/RocksDBFullRestoreOperation.java        |  2 ++
 .../state/restore/RocksDBIncrementalRestoreOperation.java | 15 ++++++++-------
 7 files changed, 29 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
index d7573fd..a9038b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
@@ -24,6 +24,9 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
 import java.util.Collection;
@@ -33,6 +36,8 @@ import java.util.Collection;
  */
 public abstract class AbstractKeyedStateBackendBuilder<K>
        implements StateBackendBuilder<AbstractKeyedStateBackend, 
BackendBuildingException> {
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
        protected final TaskKvStateRegistry kvStateRegistry;
        protected final StateSerializerProvider<K> keySerializerProvider;
        protected final ClassLoader userCodeClassLoader;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 7d9430a..64a5bb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -112,6 +112,7 @@ public class HeapKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
                        keyContext);
                try {
                        restoreOperation.restore();
+                       logger.info("Finished to build heap keyed 
state-backend.");
                } catch (Exception e) {
                        throw new BackendBuildingException("Failed when trying 
to restore heap backend", e);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index a4beb8f..3fe5e57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -46,6 +46,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 
 import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -65,6 +67,8 @@ import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleExce
  * @param <K> The data type that the serializer serializes.
  */
 public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HeapRestoreOperation.class);
+
        private final Collection<KeyedStateHandle> restoreStateHandles;
        private final StateSerializerProvider<K> keySerializerProvider;
        private final ClassLoader userCodeClassLoader;
@@ -122,6 +126,7 @@ public class HeapRestoreOperation<K> implements 
RestoreOperation<Void> {
                                throw 
unexpectedStateHandleException(KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
                        }
 
+                       LOG.info("Starting to restore from state handle: {}.", 
keyedStateHandle);
                        KeyGroupsStateHandle keyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
                        FSDataInputStream fsDataInputStream = 
keyGroupsStateHandle.openInputStream();
                        
cancelStreamRegistry.registerCloseable(fsDataInputStream);
@@ -164,6 +169,7 @@ public class HeapRestoreOperation<K> implements 
RestoreOperation<Void> {
                                        kvStatesById, restoredMetaInfos.size(),
                                        serializationProxy.getReadVersion(),
                                        
serializationProxy.isUsingKeyGroupCompression());
+                               LOG.info("Finished restoring from state handle: 
{}.", keyedStateHandle);
                        } finally {
                                if 
(cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) {
                                        IOUtils.closeQuietly(fsDataInputStream);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 85b9e27..5d44248 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -55,8 +55,6 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -82,7 +80,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBuilder<K> {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class);
        static final String DB_INSTANCE_DIR_STRING = "db";
 
        /** String that identifies the operator that owns this backend. */
@@ -317,14 +314,14 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        try {
                                FileUtils.deleteDirectory(instanceBasePath);
                        } catch (Exception ex) {
-                               LOG.warn("Failed to instance base path for 
RocksDB: " + instanceBasePath, ex);
+                               logger.warn("Failed to delete base path for 
RocksDB: " + instanceBasePath, ex);
                        }
                        // Log and rethrow
                        if (e instanceof BackendBuildingException) {
                                throw (BackendBuildingException) e;
                        } else {
                                String errMsg = "Caught unexpected exception.";
-                               LOG.error(errMsg, e);
+                               logger.error(errMsg, e);
                                throw new BackendBuildingException(errMsg, e);
                        }
                }
@@ -332,6 +329,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        keyGroupRange,
                        numberOfKeyGroups
                );
+               logger.info("Finished building RocksDB keyed state-backend at 
{}.", instanceBasePath);
                return new RocksDBKeyedStateBackend<>(
                        this.userCodeClassLoader,
                        this.instanceBasePath,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
index dbaa489..e05b1ad 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
@@ -42,6 +42,8 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -60,6 +62,8 @@ import java.util.function.Function;
  * @param <K> The data type that the serializer serializes.
  */
 public abstract class AbstractRocksDBRestoreOperation<K> implements 
RocksDBRestoreOperation, AutoCloseable {
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
        protected final KeyGroupRange keyGroupRange;
        protected final int keyGroupPrefixBytes;
        protected final int numberOfTransferringThreads;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 785adf6..cb23204 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -162,11 +162,13 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
        private void restoreKeyGroupsInStateHandle()
                throws IOException, StateMigrationException, RocksDBException {
                try {
+                       logger.info("Starting to restore from state handle: 
{}.", currentKeyGroupsStateHandle);
                        currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
                        
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
                        currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
                        restoreKVStateMetaData();
                        restoreKVStateData();
+                       logger.info("Finished restoring from state handle: 
{}.", currentKeyGroupsStateHandle);
                } finally {
                        if 
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
                                
IOUtils.closeQuietly(currentStateHandleInStream);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index c420b0b..cb35428 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -54,8 +54,6 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -86,7 +84,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * Encapsulates the process of restoring a RocksDB instance from an 
incremental snapshot.
  */
 public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestoreOperation<K> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
        private final String operatorIdentifier;
        private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
@@ -166,6 +163,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
         */
        @SuppressWarnings("unchecked")
        private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) 
throws Exception {
+               logger.info("Starting to restore from state handle: {} without 
rescaling.", keyedStateHandle);
                if (keyedStateHandle instanceof 
IncrementalRemoteKeyedStateHandle) {
                        IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle =
                                (IncrementalRemoteKeyedStateHandle) 
keyedStateHandle;
@@ -181,6 +179,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                        new 
Class[]{IncrementalRemoteKeyedStateHandle.class, 
IncrementalLocalKeyedStateHandle.class},
                                        keyedStateHandle.getClass());
                }
+               logger.info("Finished restoring from state handle: {} without 
rescaling.", keyedStateHandle);
        }
 
        private void 
restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle 
localKeyedStateHandle) {
@@ -210,12 +209,12 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
 
                Path restoreSourcePath = 
localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
 
-               LOG.debug("Restoring keyed backend uid in operator {} from 
incremental snapshot to {}.",
+               logger.debug("Restoring keyed backend uid in operator {} from 
incremental snapshot to {}.",
                        operatorIdentifier, backendUID);
 
                if (!instanceRocksDBPath.mkdirs()) {
                        String errMsg = "Could not create RocksDB data 
directory: " + instanceBasePath.getAbsolutePath();
-                       LOG.error(errMsg);
+                       logger.error(errMsg);
                        throw new IOException(errMsg);
                }
 
@@ -252,7 +251,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                try {
                        FileUtils.deleteDirectory(path.toFile());
                } catch (IOException ex) {
-                       LOG.warn("Failed to clean up path " + path, ex);
+                       logger.warn("Failed to clean up path " + path, ex);
                }
        }
 
@@ -295,6 +294,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                throw 
unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class, 
rawStateHandle.getClass());
                        }
 
+                       logger.info("Starting to restore from state handle: {} 
with rescaling.", rawStateHandle);
                        Path temporaryRestoreInstancePath = 
instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
                        try (RestoredDBInstance tmpRestoreDBInfo = 
restoreDBInstanceFromStateHandle(
                                (IncrementalRemoteKeyedStateHandle) 
rawStateHandle,
@@ -330,6 +330,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                                }
                                        } // releases native iterator resources
                                }
+                               logger.info("Finished restoring from state 
handle: {} with rescaling.", rawStateHandle);
                        } finally {
                                
cleanUpPathQuietly(temporaryRestoreInstancePath);
                        }
@@ -354,7 +355,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                writeBatchSize);
                } catch (RocksDBException e) {
                        String errMsg = "Failed to clip DB after 
initialization.";
-                       LOG.error(errMsg, e);
+                       logger.error(errMsg, e);
                        throw new BackendBuildingException(errMsg, e);
                }
        }

Reply via email to