Repository: flink
Updated Branches:
  refs/heads/master 2d872447d -> 7baf7649e


[hotfix] Use try-with-resources to ensure RocksIterator is always closed in 
RocksDBMapState.

This closes #5705.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7baf7649
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7baf7649
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7baf7649

Branch: refs/heads/master
Commit: 7baf7649e9ecd485d6f036b7755c2f98cca74e3a
Parents: 21cf59d
Author: sihuazhou <summerle...@163.com>
Authored: Wed Mar 28 00:43:55 2018 +0800
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 16 +++--
 .../streaming/state/RocksDBMapState.java        | 71 ++++++++++----------
 2 files changed, 47 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 31b9d99..3000667 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1313,10 +1313,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private static final List<Comparator<MergeIterator>> 
COMPARATORS;
 
                static {
-                       int maxBytes = 4;
+                       int maxBytes = 2;
                        COMPARATORS = new ArrayList<>(maxBytes);
                        for (int i = 0; i < maxBytes; ++i) {
-                               final int currentBytes = i;
+                               final int currentBytes = i + 1;
                                COMPARATORS.add(new Comparator<MergeIterator>() 
{
                                        @Override
                                        public int compare(MergeIterator o1, 
MergeIterator o2) {
@@ -1330,9 +1330,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
                        Preconditions.checkNotNull(kvStateIterators);
+                       Preconditions.checkArgument(keyGroupPrefixByteCount >= 
1);
+
                        this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
+                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
                        if (kvStateIterators.size() > 0) {
                                PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
@@ -1837,10 +1839,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private Snapshot snapshot;
                private ReadOptions readOptions;
 
-               /** The state meta data. */
+               /**
+                * The state meta data.
+                */
                private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots;
 
-               /** The copied column handle. */
+               /**
+                * The copied column handle.
+                */
                private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
 
                private List<Tuple2<RocksIterator, Integer>> kvStateIterators;

http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index c75a2ed..baa90fa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -540,52 +540,53 @@ public class RocksDBMapState<K, N, UK, UV>
                                return;
                        }
 
-                       RocksIterator iterator = db.newIterator(columnFamily);
-
-                       /*
-                        * The iteration starts from the prefix bytes at the 
first loading. The cache then is
-                        * reloaded when the next entry to return is the last 
one in the cache. At that time,
-                        * we will start the iterating from the last returned 
entry.
-                        */
-                       RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? 
null : cacheEntries.get(cacheEntries.size() - 1);
-                       byte[] startBytes = (lastEntry == null ? keyPrefixBytes 
: lastEntry.rawKeyBytes);
-
-                       cacheEntries.clear();
-                       cacheIndex = 0;
-
-                       iterator.seek(startBytes);
-
-                       /*
-                        * If the last returned entry is not deleted, it will 
be the first entry in the
-                        * iterating. Skip it to avoid redundant access in such 
cases.
-                        */
-                       if (lastEntry != null && !lastEntry.deleted) {
-                               iterator.next();
-                       }
-
-                       while (true) {
-                               if (!iterator.isValid() || 
!underSameKey(iterator.key())) {
-                                       expired = true;
-                                       break;
+                       // use try-with-resources to ensure RocksIterator can 
be release even some runtime exception
+                       // occurred in the below code block.
+                       try (RocksIterator iterator = 
db.newIterator(columnFamily)) {
+
+                               /*
+                                * The iteration starts from the prefix bytes 
at the first loading. The cache then is
+                                * reloaded when the next entry to return is 
the last one in the cache. At that time,
+                                * we will start the iterating from the last 
returned entry.
+                                */
+                               RocksDBMapEntry lastEntry = cacheEntries.size() 
== 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+                               byte[] startBytes = (lastEntry == null ? 
keyPrefixBytes : lastEntry.rawKeyBytes);
+
+                               cacheEntries.clear();
+                               cacheIndex = 0;
+
+                               iterator.seek(startBytes);
+
+                               /*
+                                * If the last returned entry is not deleted, 
it will be the first entry in the
+                                * iterating. Skip it to avoid redundant access 
in such cases.
+                                */
+                               if (lastEntry != null && !lastEntry.deleted) {
+                                       iterator.next();
                                }
 
-                               if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
-                                       break;
-                               }
+                               while (true) {
+                                       if (!iterator.isValid() || 
!underSameKey(iterator.key())) {
+                                               expired = true;
+                                               break;
+                                       }
 
-                               RocksDBMapEntry entry = new RocksDBMapEntry(
+                                       if (cacheEntries.size() >= 
CACHE_SIZE_LIMIT) {
+                                               break;
+                                       }
+
+                                       RocksDBMapEntry entry = new 
RocksDBMapEntry(
                                                db,
                                                iterator.key(),
                                                iterator.value(),
                                                keySerializer,
                                                valueSerializer);
 
-                               cacheEntries.add(entry);
+                                       cacheEntries.add(entry);
 
-                               iterator.next();
+                                       iterator.next();
+                               }
                        }
-
-                       iterator.close();
                }
 
                private boolean underSameKey(byte[] rawKeyBytes) {

Reply via email to