[hotfix] RocksDB make default column family first

According to the documentation of RocksDB, the default column family should 
always be created first.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca523fd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca523fd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca523fd5

Branch: refs/heads/master
Commit: ca523fd556d3c8411ca550e20087d18fd3e72a0c
Parents: 0f27116
Author: Stefan Richter <[email protected]>
Authored: Wed Jan 17 13:05:17 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:59:54 2018 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 52 ++++++++++---------
 .../state/RocksDBMergeIteratorTest.java         | 54 ++++++++++----------
 2 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca523fd5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3accbe5..c02f130 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -156,9 +156,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** File suffix of sstable files. */
        private static final String SST_FILE_SUFFIX = ".sst";
 
-       /** Bytes for the name of the column descriptor for the default column 
family. */
-       public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = 
"default".getBytes(ConfigConstants.DEFAULT_CHARSET);
-
        /** String that identifies the operator that owns this backend. */
        private final String operatorIdentifier;
 
@@ -349,29 +346,31 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                if (db != null) {
 
                        // RocksDB's native memory management requires that 
*all* CFs (including default) are closed before the
-                       // DB is closed. So we start with the ones created by 
Flink...
+                       // DB is closed. See:
+                       // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
+                       // Start with default CF ...
+                       IOUtils.closeQuietly(defaultColumnFamily);
+
+                       // ... continue with the ones created by Flink...
                        for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
                                kvStateInformation.values()) {
                                IOUtils.closeQuietly(columnMetaData.f0);
                        }
 
-                       // ... close the default CF ...
-                       IOUtils.closeQuietly(defaultColumnFamily);
-
                        // ... and finally close the DB instance ...
                        IOUtils.closeQuietly(db);
 
-                       // invalidate the reference before releasing the lock 
so that other accesses will not cause crashes
+                       // invalidate the reference
                        db = null;
-               }
 
-               kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
+                       kvStateInformation.clear();
+                       restoredKvStateMetaInfos.clear();
 
-               IOUtils.closeQuietly(dbOptions);
-               IOUtils.closeQuietly(columnOptions);
+                       IOUtils.closeQuietly(dbOptions);
+                       IOUtils.closeQuietly(columnOptions);
 
-               cleanInstanceBasePath();
+                       cleanInstanceBasePath();
+               }
        }
 
        private void cleanInstanceBasePath() {
@@ -475,11 +474,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                        new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
 
+               // we add the required descriptor for the default CF in FIRST 
position, see
+               // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
+               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
                columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
 
-               // we add the required descriptor for the default CF in last 
position.
-               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
-
                RocksDB dbRef;
 
                try {
@@ -602,7 +601,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
                                serializationProxy.getStateMetaInfoSnapshots();
                        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-                       //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
 
                        for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
 
@@ -845,8 +843,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                
stateBackend.instanceRocksDBPath.getAbsolutePath(),
                                columnFamilyDescriptors, columnFamilyHandles);
 
-                       // extract and store the default column family which is 
located at the last index
-                       stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
+                       // extract and store the default column family which is 
located at the first index
+                       stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(0);
 
                        for (int i = 0; i < columnFamilyDescriptors.size(); 
++i) {
                                RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
@@ -1027,8 +1025,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                columnFamilyDescriptors,
                                columnFamilyHandles)) {
 
+                               final ColumnFamilyHandle defaultColumnFamily = 
columnFamilyHandles.remove(0);
+
+                               
Preconditions.checkState(columnFamilyHandles.size() == 
columnFamilyDescriptors.size());
+
                                try {
-                                       // iterating only the requested 
descriptors automatically skips the default column family handle
                                        for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
                                                ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
                                                ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
@@ -1085,9 +1086,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                } // releases native iterator 
resources
                                        }
                                } finally {
+
                                        //release native tmp db column family 
resources
-                                       for (ColumnFamilyHandle 
columnFamilyHandle : columnFamilyHandles) {
-                                               
IOUtils.closeQuietly(columnFamilyHandle);
+                                       
IOUtils.closeQuietly(defaultColumnFamily);
+
+                                       for (ColumnFamilyHandle 
flinkColumnFamilyHandle : columnFamilyHandles) {
+                                               
IOUtils.closeQuietly(flinkColumnFamilyHandle);
                                        }
                                }
                        } // releases native tmp db resources
@@ -1165,7 +1169,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-               
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, 
nameBytes),
+               
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
                        "The chosen state name 'default' collides with the name 
of the default column family!");
 
                ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);

http://git-wip-us.apache.org/repos/asf/flink/blob/ca523fd5/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 1d14f6e..19f49f8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.util.IOUtils;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -53,7 +54,7 @@ public class RocksDBMergeIteratorTest {
        @Test
        public void testEmptyMergeIterator() throws IOException {
                RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
-                               new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2);
+                               new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
                Assert.assertFalse(emptyIterator.isValid());
        }
 
@@ -74,8 +75,7 @@ public class RocksDBMergeIteratorTest {
        public void testMergeIterator(int maxParallelism) throws Exception {
                Random random = new Random(1234);
 
-               RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath());
-               try {
+               try (RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
                        List<Tuple2<RocksIterator, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
                        List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -83,7 +83,7 @@ public class RocksDBMergeIteratorTest {
 
                        for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
                                ColumnFamilyHandle handle = 
rocksDB.createColumnFamily(
-                                               new 
ColumnFamilyDescriptor(("column-" + 
c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
+                                       new ColumnFamilyDescriptor(("column-" + 
c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
 
                                ByteArrayOutputStreamWithPos bos = new 
ByteArrayOutputStreamWithPos();
                                DataOutputStream dos = new 
DataOutputStream(bos);
@@ -113,39 +113,41 @@ public class RocksDBMergeIteratorTest {
                                ++id;
                        }
 
-                       RocksDBKeyedStateBackend.RocksDBMergeIterator 
mergeIterator = new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, 
maxParallelism <= Byte.MAX_VALUE ? 1 : 2);
+                       try (RocksDBKeyedStateBackend.RocksDBMergeIterator 
mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
+                               rocksIteratorsWithKVStateId,
+                               maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {
 
-                       int prevKVState = -1;
-                       int prevKey = -1;
-                       int prevKeyGroup = -1;
-                       int totalKeysActual = 0;
+                               int prevKVState = -1;
+                               int prevKey = -1;
+                               int prevKeyGroup = -1;
+                               int totalKeysActual = 0;
 
-                       while (mergeIterator.isValid()) {
-                               ByteBuffer bb = 
ByteBuffer.wrap(mergeIterator.key());
+                               while (mergeIterator.isValid()) {
+                                       ByteBuffer bb = 
ByteBuffer.wrap(mergeIterator.key());
 
-                               int keyGroup = maxParallelism > Byte.MAX_VALUE 
? bb.getShort() : bb.get();
-                               int key = bb.getInt();
+                                       int keyGroup = maxParallelism > 
Byte.MAX_VALUE ? bb.getShort() : bb.get();
+                                       int key = bb.getInt();
 
-                               Assert.assertTrue(keyGroup >= prevKeyGroup);
-                               Assert.assertTrue(key >= prevKey);
-                               Assert.assertEquals(prevKeyGroup != keyGroup, 
mergeIterator.isNewKeyGroup());
-                               Assert.assertEquals(prevKVState != 
mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
+                                       Assert.assertTrue(keyGroup >= 
prevKeyGroup);
+                                       Assert.assertTrue(key >= prevKey);
+                                       Assert.assertEquals(prevKeyGroup != 
keyGroup, mergeIterator.isNewKeyGroup());
+                                       Assert.assertEquals(prevKVState != 
mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
 
-                               prevKeyGroup = keyGroup;
-                               prevKVState = mergeIterator.kvStateId();
+                                       prevKeyGroup = keyGroup;
+                                       prevKVState = mergeIterator.kvStateId();
 
-                               //System.out.println(keyGroup + " " + key + " " 
+ mergeIterator.kvStateId());
-                               mergeIterator.next();
-                               ++totalKeysActual;
+                                       mergeIterator.next();
+                                       ++totalKeysActual;
+                               }
+
+                               Assert.assertEquals(totalKeysExpected, 
totalKeysActual);
                        }
 
-                       Assert.assertEquals(totalKeysExpected, totalKeysActual);
+                       IOUtils.closeQuietly(rocksDB.getDefaultColumnFamily());
 
                        for (Tuple2<ColumnFamilyHandle, Integer> 
handleWithCount : columnFamilyHandlesWithKeyCount) {
-                               rocksDB.dropColumnFamily(handleWithCount.f0);
+                               IOUtils.closeQuietly(handleWithCount.f0);
                        }
-               } finally {
-                       rocksDB.close();
                }
        }
 

Reply via email to