Repository: flink
Updated Branches:
  refs/heads/release-1.5 d3395e02e -> 764bafdd4


[FLINK-8968][state] Pull the creation of readOptions out of loop to avoid 
native resource leak.

(cherry picked from commit f5071d7)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/340ee26f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/340ee26f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/340ee26f

Branch: refs/heads/release-1.5
Commit: 340ee26f1660afe23d6653d34885256c8aada764
Parents: d3395e0
Author: sihuazhou <summerle...@163.com>
Authored: Thu Mar 15 16:57:11 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Apr 6 12:33:33 2018 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java   | 9 +++++----
 .../streaming/state/RocksDBStateBackendConfigTest.java      | 8 ++++++--
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/340ee26f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6a23181..cdeb608 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1952,15 +1952,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        this.kvStateIterators = new 
ArrayList<>(kvStateInformationCopy.size());
 
                        int kvStateId = 0;
+
+                       //retrieve iterator for this k/v states
+                       readOptions = new ReadOptions();
+                       readOptions.setSnapshot(snapshot);
+
                        for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
                                kvStateInformationCopy) {
 
                                metaInfoSnapshots.add(column.f1.snapshot());
 
-                               //retrieve iterator for this k/v states
-                               readOptions = new ReadOptions();
-                               readOptions.setSnapshot(snapshot);
-
                                kvStateIterators.add(
                                        new 
Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/340ee26f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2dd67f5..65d5b2e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -299,7 +299,9 @@ public class RocksDBStateBackendConfigTest {
                });
 
                assertNotNull(rocksDbBackend.getOptions());
-               assertEquals(CompactionStyle.FIFO, 
rocksDbBackend.getColumnOptions().compactionStyle());
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.FIFO, 
colCreated.compactionStyle());
+               }
        }
 
        @Test
@@ -324,7 +326,9 @@ public class RocksDBStateBackendConfigTest {
 
                assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
                assertNotNull(rocksDbBackend.getOptions());
-               assertEquals(CompactionStyle.UNIVERSAL, 
rocksDbBackend.getColumnOptions().compactionStyle());
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.UNIVERSAL, 
colCreated.compactionStyle());
+               }
        }
 
        @Test

Reply via email to