[hotfix] RocksDB make default column family first According to the documentation of RocksDB, the default column family should always be created first.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca523fd5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca523fd5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca523fd5 Branch: refs/heads/master Commit: ca523fd556d3c8411ca550e20087d18fd3e72a0c Parents: 0f27116 Author: Stefan Richter <[email protected]> Authored: Wed Jan 17 13:05:17 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:59:54 2018 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 52 ++++++++++--------- .../state/RocksDBMergeIteratorTest.java | 54 ++++++++++---------- 2 files changed, 56 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ca523fd5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 3accbe5..c02f130 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -156,9 +156,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** File suffix of sstable files. */ private static final String SST_FILE_SUFFIX = ".sst"; - /** Bytes for the name of the column descriptor for the default column family. */ - public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET); - /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; @@ -349,29 +346,31 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (db != null) { // RocksDB's native memory management requires that *all* CFs (including default) are closed before the - // DB is closed. So we start with the ones created by Flink... + // DB is closed. See: + // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families + // Start with default CF ... + IOUtils.closeQuietly(defaultColumnFamily); + + // ... continue with the ones created by Flink... for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : kvStateInformation.values()) { IOUtils.closeQuietly(columnMetaData.f0); } - // ... close the default CF ... - IOUtils.closeQuietly(defaultColumnFamily); - // ... and finally close the DB instance ... IOUtils.closeQuietly(db); - // invalidate the reference before releasing the lock so that other accesses will not cause crashes + // invalidate the reference db = null; - } - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); - IOUtils.closeQuietly(dbOptions); - IOUtils.closeQuietly(columnOptions); + IOUtils.closeQuietly(dbOptions); + IOUtils.closeQuietly(columnOptions); - cleanInstanceBasePath(); + cleanInstanceBasePath(); + } } private void cleanInstanceBasePath() { @@ -475,11 +474,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); + // we add the required descriptor for the default CF in FIRST position, see + // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions)); columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - // we add the required descriptor for the default CF in last position. - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); - RocksDB dbRef; try { @@ -602,7 +601,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { @@ -845,8 +843,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateBackend.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); - // extract and store the default column family which is located at the last index - stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1); + // extract and store the default column family which is located at the first index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0); for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); @@ -1027,8 +1025,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyDescriptors, columnFamilyHandles)) { + final ColumnFamilyHandle defaultColumnFamily = columnFamilyHandles.remove(0); + + Preconditions.checkState(columnFamilyHandles.size() == columnFamilyDescriptors.size()); + try { - // iterating only the requested descriptors automatically skips the default column family handle for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); @@ -1085,9 +1086,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } // releases native iterator resources } } finally { + //release native tmp db column family resources - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - IOUtils.closeQuietly(columnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamily); + + for (ColumnFamilyHandle flinkColumnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(flinkColumnFamilyHandle); } } } // releases native tmp db resources @@ -1165,7 +1169,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes), + Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!"); ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); http://git-wip-us.apache.org/repos/asf/flink/blob/ca523fd5/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java index 1d14f6e..19f49f8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.util.IOUtils; import org.junit.Assert; import org.junit.Rule; @@ -53,7 +54,7 @@ public class RocksDBMergeIteratorTest { @Test public void testEmptyMergeIterator() throws IOException { RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator = - new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2); + new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2); Assert.assertFalse(emptyIterator.isValid()); } @@ -74,8 +75,7 @@ public class RocksDBMergeIteratorTest { public void testMergeIterator(int maxParallelism) throws Exception { Random random = new Random(1234); - RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath()); - try { + try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>(); @@ -83,7 +83,7 @@ public class RocksDBMergeIteratorTest { for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) { ColumnFamilyHandle handle = rocksDB.createColumnFamily( - new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET))); + new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET))); ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos(); DataOutputStream dos = new DataOutputStream(bos); @@ -113,39 +113,41 @@ public class RocksDBMergeIteratorTest { ++id; } - RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, maxParallelism <= Byte.MAX_VALUE ? 1 : 2); + try (RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator( + rocksIteratorsWithKVStateId, + maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) { - int prevKVState = -1; - int prevKey = -1; - int prevKeyGroup = -1; - int totalKeysActual = 0; + int prevKVState = -1; + int prevKey = -1; + int prevKeyGroup = -1; + int totalKeysActual = 0; - while (mergeIterator.isValid()) { - ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key()); + while (mergeIterator.isValid()) { + ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key()); - int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get(); - int key = bb.getInt(); + int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get(); + int key = bb.getInt(); - Assert.assertTrue(keyGroup >= prevKeyGroup); - Assert.assertTrue(key >= prevKey); - Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup()); - Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState()); + Assert.assertTrue(keyGroup >= prevKeyGroup); + Assert.assertTrue(key >= prevKey); + Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup()); + Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState()); - prevKeyGroup = keyGroup; - prevKVState = mergeIterator.kvStateId(); + prevKeyGroup = keyGroup; + prevKVState = mergeIterator.kvStateId(); - //System.out.println(keyGroup + " " + key + " " + mergeIterator.kvStateId()); - mergeIterator.next(); - ++totalKeysActual; + mergeIterator.next(); + ++totalKeysActual; + } + + Assert.assertEquals(totalKeysExpected, totalKeysActual); } - Assert.assertEquals(totalKeysExpected, totalKeysActual); + IOUtils.closeQuietly(rocksDB.getDefaultColumnFamily()); for (Tuple2<ColumnFamilyHandle, Integer> handleWithCount : columnFamilyHandlesWithKeyCount) { - rocksDB.dropColumnFamily(handleWithCount.f0); + IOUtils.closeQuietly(handleWithCount.f0); } - } finally { - rocksDB.close(); } }
