This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8be1058 [FLINK-23003][runtime] Fix resource leak in
RocksIncrementalSnapshotStrategy
8be1058 is described below
commit 8be1058a60565587b465a2237136dbbbb4c168f3
Author: leiyanfei <[email protected]>
AuthorDate: Fri Jun 11 17:20:48 2021 +0800
[FLINK-23003][runtime] Fix resource leak in RocksIncrementalSnapshotStrategy
---
.../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 1 +
.../contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java | 3 ++-
.../streaming/state/snapshot/RocksDBSnapshotStrategyBase.java | 5 ++++-
.../contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java | 5 +++++
.../streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java | 5 +++++
5 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 499493b..5ec7f41 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -465,6 +465,7 @@ public class RocksDBKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
cleanInstanceBasePath();
}
+ IOUtils.closeQuietly(checkpointSnapshotStrategy);
this.disposed = true;
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 5eb0be0..246677d 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -265,7 +265,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
- RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy;
+ RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
PriorityQueueSetFactory priorityQueueFactory;
SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
// Number of bytes required to prefix the key groups.
@@ -363,6 +363,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
IOUtils.closeQuietly(optionsContainer);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
+ IOUtils.closeQuietly(checkpointStrategy);
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (Exception ex) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 74311ef..b064fc5 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -43,7 +43,7 @@ import java.util.LinkedHashMap;
* @param <K> type of the backend keys.
*/
public abstract class RocksDBSnapshotStrategyBase<K, R extends
SnapshotResources>
- implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R> {
+ implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
@@ -92,4 +92,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R
extends SnapshotResources
public String getDescription() {
return description;
}
+
+ @Override
+ public abstract void close();
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 8812165..8e3063a 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -141,6 +141,11 @@ public class RocksFullSnapshotStrategy<K>
// nothing to do.
}
+ @Override
+ public void close() {
+ // nothing to do.
+ }
+
private SupplierWithException<CheckpointStreamWithResultProvider,
Exception>
createCheckpointStreamSupplier(
long checkpointId,
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 682a3f7..99f0b6f 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -204,6 +204,11 @@ public class RocksIncrementalSnapshotStrategy<K>
}
}
+ @Override
+ public void close() {
+ stateUploader.close();
+ }
+
@Nonnull
private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId)
throws IOException {