This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 6addd1e [FLINK-19013][state-backends] Add start/end logs for state
restoration
6addd1e is described below
commit 6addd1ea75f002e3a770d9e0da727edb039f8fd1
Author: Yun Tang <[email protected]>
AuthorDate: Wed Dec 16 01:30:23 2020 +0800
[FLINK-19013][state-backends] Add start/end logs for state restoration
---
.../runtime/state/AbstractKeyedStateBackendBuilder.java | 5 +++++
.../runtime/state/heap/HeapKeyedStateBackendBuilder.java | 1 +
.../flink/runtime/state/heap/HeapRestoreOperation.java | 6 ++++++
.../streaming/state/RocksDBKeyedStateBackendBuilder.java | 8 +++-----
.../state/restore/AbstractRocksDBRestoreOperation.java | 4 ++++
.../state/restore/RocksDBFullRestoreOperation.java | 2 ++
.../state/restore/RocksDBIncrementalRestoreOperation.java | 15 ++++++++-------
7 files changed, 29 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
index d7573fd..a9038b3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
@@ -24,6 +24,9 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
import java.util.Collection;
@@ -33,6 +36,8 @@ import java.util.Collection;
*/
public abstract class AbstractKeyedStateBackendBuilder<K>
implements StateBackendBuilder<AbstractKeyedStateBackend,
BackendBuildingException> {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
protected final TaskKvStateRegistry kvStateRegistry;
protected final StateSerializerProvider<K> keySerializerProvider;
protected final ClassLoader userCodeClassLoader;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 7d9430a..64a5bb3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -112,6 +112,7 @@ public class HeapKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBu
keyContext);
try {
restoreOperation.restore();
+ logger.info("Finished to build heap keyed
state-backend.");
} catch (Exception e) {
throw new BackendBuildingException("Failed when trying
to restore heap backend", e);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index a4beb8f..3fe5e57 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -46,6 +46,8 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -65,6 +67,8 @@ import static
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleExce
* @param <K> The data type that the serializer serializes.
*/
public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HeapRestoreOperation.class);
+
private final Collection<KeyedStateHandle> restoreStateHandles;
private final StateSerializerProvider<K> keySerializerProvider;
private final ClassLoader userCodeClassLoader;
@@ -122,6 +126,7 @@ public class HeapRestoreOperation<K> implements
RestoreOperation<Void> {
throw
unexpectedStateHandleException(KeyGroupsStateHandle.class,
keyedStateHandle.getClass());
}
+ LOG.info("Starting to restore from state handle: {}.",
keyedStateHandle);
KeyGroupsStateHandle keyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
FSDataInputStream fsDataInputStream =
keyGroupsStateHandle.openInputStream();
cancelStreamRegistry.registerCloseable(fsDataInputStream);
@@ -164,6 +169,7 @@ public class HeapRestoreOperation<K> implements
RestoreOperation<Void> {
kvStatesById, restoredMetaInfos.size(),
serializationProxy.getReadVersion(),
serializationProxy.isUsingKeyGroupCompression());
+ LOG.info("Finished restoring from state handle:
{}.", keyedStateHandle);
} finally {
if
(cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) {
IOUtils.closeQuietly(fsDataInputStream);
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 85b9e27..5d44248 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -55,8 +55,6 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
@@ -82,7 +80,6 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
*/
public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBuilder<K> {
- private static final Logger LOG =
LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class);
static final String DB_INSTANCE_DIR_STRING = "db";
/** String that identifies the operator that owns this backend. */
@@ -317,14 +314,14 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (Exception ex) {
- LOG.warn("Failed to instance base path for
RocksDB: " + instanceBasePath, ex);
+ logger.warn("Failed to delete base path for
RocksDB: " + instanceBasePath, ex);
}
// Log and rethrow
if (e instanceof BackendBuildingException) {
throw (BackendBuildingException) e;
} else {
String errMsg = "Caught unexpected exception.";
- LOG.error(errMsg, e);
+ logger.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
@@ -332,6 +329,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
keyGroupRange,
numberOfKeyGroups
);
+ logger.info("Finished building RocksDB keyed state-backend at
{}.", instanceBasePath);
return new RocksDBKeyedStateBackend<>(
this.userCodeClassLoader,
this.instanceBasePath,
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
index dbaa489..e05b1ad 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
@@ -42,6 +42,8 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
@@ -60,6 +62,8 @@ import java.util.function.Function;
* @param <K> The data type that the serializer serializes.
*/
public abstract class AbstractRocksDBRestoreOperation<K> implements
RocksDBRestoreOperation, AutoCloseable {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
protected final KeyGroupRange keyGroupRange;
protected final int keyGroupPrefixBytes;
protected final int numberOfTransferringThreads;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 785adf6..cb23204 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -162,11 +162,13 @@ public class RocksDBFullRestoreOperation<K> extends
AbstractRocksDBRestoreOperat
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
+ logger.info("Starting to restore from state handle:
{}.", currentKeyGroupsStateHandle);
currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
restoreKVStateData();
+ logger.info("Finished restoring from state handle:
{}.", currentKeyGroupsStateHandle);
} finally {
if
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index c420b0b..cb35428 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -54,8 +54,6 @@ import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -86,7 +84,6 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
* Encapsulates the process of restoring a RocksDB instance from an
incremental snapshot.
*/
public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestoreOperation<K> {
- private static final Logger LOG =
LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
private final String operatorIdentifier;
private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
@@ -166,6 +163,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
*/
@SuppressWarnings("unchecked")
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle)
throws Exception {
+ logger.info("Starting to restore from state handle: {} without
rescaling.", keyedStateHandle);
if (keyedStateHandle instanceof
IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle)
keyedStateHandle;
@@ -181,6 +179,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
new
Class[]{IncrementalRemoteKeyedStateHandle.class,
IncrementalLocalKeyedStateHandle.class},
keyedStateHandle.getClass());
}
+ logger.info("Finished restoring from state handle: {} without
rescaling.", keyedStateHandle);
}
private void
restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle
localKeyedStateHandle) {
@@ -210,12 +209,12 @@ public class RocksDBIncrementalRestoreOperation<K>
extends AbstractRocksDBRestor
Path restoreSourcePath =
localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
- LOG.debug("Restoring keyed backend uid in operator {} from
incremental snapshot to {}.",
+ logger.debug("Restoring keyed backend uid in operator {} from
incremental snapshot to {}.",
operatorIdentifier, backendUID);
if (!instanceRocksDBPath.mkdirs()) {
String errMsg = "Could not create RocksDB data
directory: " + instanceBasePath.getAbsolutePath();
- LOG.error(errMsg);
+ logger.error(errMsg);
throw new IOException(errMsg);
}
@@ -252,7 +251,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
try {
FileUtils.deleteDirectory(path.toFile());
} catch (IOException ex) {
- LOG.warn("Failed to clean up path " + path, ex);
+ logger.warn("Failed to clean up path " + path, ex);
}
}
@@ -295,6 +294,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
throw
unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class,
rawStateHandle.getClass());
}
+ logger.info("Starting to restore from state handle: {}
with rescaling.", rawStateHandle);
Path temporaryRestoreInstancePath =
instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo =
restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle)
rawStateHandle,
@@ -330,6 +330,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
}
} // releases native iterator resources
}
+ logger.info("Finished restoring from state
handle: {} with rescaling.", rawStateHandle);
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
@@ -354,7 +355,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends
AbstractRocksDBRestor
writeBatchSize);
} catch (RocksDBException e) {
String errMsg = "Failed to clip DB after
initialization.";
- LOG.error(errMsg, e);
+ logger.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}