This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new d5eda1e KAFKA-8791: RocksDBTimestampedStore should open in regular mode by default (#7201) d5eda1e is described below commit d5eda1eac8314cc3107208c1098e1f89d41c2ed5 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Tue Aug 13 18:37:45 2019 -0700 KAFKA-8791: RocksDBTimestampedStore should open in regular mode by default (#7201) Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Bill Bejeck <b...@confluent.io>, Richard Yu <yohan.richard...@gmail.com>, Guozhang Wang <guozh...@confluent.io> --- .../state/internals/RocksDBTimestampedStore.java | 32 +- .../internals/RocksDBTimestampedStoreTest.java | 341 ++++++++++++++------- 2 files changed, 240 insertions(+), 133 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 74f0919..7a7adb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -70,20 +70,7 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped try { db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); - - final ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0); - - final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); - noTimestampsIter.seekToFirst(); - if (noTimestampsIter.isValid()) { - log.info("Opening store {} in upgrade mode", name); - dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, columnFamilies.get(1)); - } else { - log.info("Opening store {} in regular mode", name); - dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1)); - noTimestampColumnFamily.close(); - } - noTimestampsIter.close(); + setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); } catch (final RocksDBException e) { if ("Column family not found: : keyValueWithTimestamp".equals(e.getMessage())) { try { @@ -92,14 +79,27 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped } catch (final RocksDBException fatal) { throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal); } - log.info("Opening store {} in upgrade mode", name); - dbAccessor = new DualColumnFamilyAccessor(columnFamilies.get(0), columnFamilies.get(1)); + setDbAccessor(columnFamilies.get(0), columnFamilies.get(1)); } else { throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); } } } + private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, + final ColumnFamilyHandle withTimestampColumnFamily) { + final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); + noTimestampsIter.seekToFirst(); + if (noTimestampsIter.isValid()) { + log.info("Opening store {} in upgrade mode", name); + dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily); + } else { + log.info("Opening store {} in regular mode", name); + dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily); + noTimestampColumnFamily.close(); + } + noTimestampsIter.close(); + } private class DualColumnFamilyAccessor implements RocksDBAccessor { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index f49527b..e7ad30d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -47,6 +47,79 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { } @Test + public void shouldOpenNewStoreInRegularMode() { + LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class); + + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + rocksDBStore.init(context, rocksDBStore); + assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode")); + LogCaptureAppender.unregister(appender); + + try (final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all()) { + assertThat(iterator.hasNext(), is(false)); + } + } + + @Test + public void shouldOpenExistingStoreInRegularMode() throws Exception { + LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class); + + // prepare store + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.put(new Bytes("key".getBytes()), "timestamped".getBytes()); + rocksDBStore.close(); + + // re-open store + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + rocksDBStore = getRocksDBStore(); + rocksDBStore.init(context, rocksDBStore); + assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode")); + LogCaptureAppender.unregister(appender); + + rocksDBStore.close(); + + // verify store + final DBOptions dbOptions = new DBOptions(); + final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); + + final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList( + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), + new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); + final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + + RocksDB db = null; + ColumnFamilyHandle noTimestampColumnFamily = null, withTimestampColumnFamily = null; + try { + db = RocksDB.open( + dbOptions, + new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), + columnFamilyDescriptors, + columnFamilies); + + noTimestampColumnFamily = columnFamilies.get(0); + withTimestampColumnFamily = columnFamilies.get(1); + + assertThat(db.get(noTimestampColumnFamily, "key".getBytes()), new IsNull<>()); + assertThat(db.getLongProperty(noTimestampColumnFamily, "rocksdb.estimate-num-keys"), is(0L)); + assertThat(db.get(withTimestampColumnFamily, "key".getBytes()).length, is(11)); + assertThat(db.getLongProperty(withTimestampColumnFamily, "rocksdb.estimate-num-keys"), is(1L)); + } finally { + // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions + if (noTimestampColumnFamily != null) { + noTimestampColumnFamily.close(); + } + if (withTimestampColumnFamily != null) { + withTimestampColumnFamily.close(); + } + if (db != null) { + db.close(); + } + dbOptions.close(); + columnFamilyOptions.close(); + } + } + + @Test public void shouldMigrateDataFromDefaultToTimestampColumnFamily() throws Exception { prepareOldStore(); @@ -139,70 +212,70 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { private void iteratorsShouldNotMigrateData() { // iterating should not migrate any data, but return all key over both CF (plus surrogate timestamps for old CF) - final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all(); - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key1".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 1 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key11".getBytes(), keyValue.key.get()); - assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key2".getBytes(), keyValue.key.get()); - assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key4".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 4444 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key5".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 55555 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all()) { + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key1".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 1 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key11".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key7".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 7777777 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', '7', '7', '7', '7', '7', '7'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = itAll.next(); + assertArrayEquals("key8".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value); + } + assertFalse(itAll.hasNext()); } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key7".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 7777777 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', '7', '7', '7', '7', '7', '7'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = itAll.next(); - assertArrayEquals("key8".getBytes(), keyValue.key.get()); - assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value); - } - assertFalse(itAll.hasNext()); - itAll.close(); - - final KeyValueIterator<Bytes, byte[]> it = - rocksDBStore.range(new Bytes("key2".getBytes()), new Bytes("key5".getBytes())); - { - final KeyValue<Bytes, byte[]> keyValue = it.next(); - assertArrayEquals("key2".getBytes(), keyValue.key.get()); - assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = it.next(); - assertArrayEquals("key4".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 4444 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); - } - { - final KeyValue<Bytes, byte[]> keyValue = it.next(); - assertArrayEquals("key5".getBytes(), keyValue.key.get()); - // unknown timestamp == -1 plus value == 55555 - assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + + try (final KeyValueIterator<Bytes, byte[]> it = + rocksDBStore.range(new Bytes("key2".getBytes()), new Bytes("key5".getBytes()))) { + { + final KeyValue<Bytes, byte[]> keyValue = it.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = it.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue<Bytes, byte[]> keyValue = it.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + assertFalse(it.hasNext()); } - assertFalse(it.hasNext()); - it.close(); } private void verifyOldAndNewColumnFamily() throws Exception { @@ -214,40 +287,60 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); - RocksDB db = RocksDB.open( - dbOptions, - new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), - columnFamilyDescriptors, - columnFamilies); - - ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0); - final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1); - - assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key7".getBytes()).length, is(7)); - assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), new IsNull<>()); - assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); - - assertThat(db.get(withTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); - assertThat(db.get(withTimestampColumnFamily, "key1".getBytes()).length, is(8 + 1)); - assertThat(db.get(withTimestampColumnFamily, "key2".getBytes()).length, is(12)); - assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); - assertThat(db.get(withTimestampColumnFamily, "key4".getBytes()).length, is(8 + 4)); - assertThat(db.get(withTimestampColumnFamily, "key5".getBytes()).length, is(8 + 5)); - assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); - assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), new IsNull<>()); - assertThat(db.get(withTimestampColumnFamily, "key8".getBytes()).length, is(18)); - assertThat(db.get(withTimestampColumnFamily, "key11".getBytes()).length, is(21)); - assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); - - db.close(); + RocksDB db = null; + ColumnFamilyHandle noTimestampColumnFamily = null, withTimestampColumnFamily = null; + boolean errorOccurred = false; + try { + db = RocksDB.open( + dbOptions, + new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), + columnFamilyDescriptors, + columnFamilies); + + noTimestampColumnFamily = columnFamilies.get(0); + withTimestampColumnFamily = columnFamilies.get(1); + + assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key7".getBytes()).length, is(7)); + assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); + + assertThat(db.get(withTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key1".getBytes()).length, is(8 + 1)); + assertThat(db.get(withTimestampColumnFamily, "key2".getBytes()).length, is(12)); + assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key4".getBytes()).length, is(8 + 4)); + assertThat(db.get(withTimestampColumnFamily, "key5".getBytes()).length, is(8 + 5)); + assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key8".getBytes()).length, is(18)); + assertThat(db.get(withTimestampColumnFamily, "key11".getBytes()).length, is(21)); + assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); + } catch (final RuntimeException fatal) { + errorOccurred = true; + } finally { + // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions + if (noTimestampColumnFamily != null) { + noTimestampColumnFamily.close(); + } + if (withTimestampColumnFamily != null) { + withTimestampColumnFamily.close(); + } + if (db != null) { + db.close(); + } + if (errorOccurred) { + dbOptions.close(); + columnFamilyOptions.close(); + } + } // check that still in upgrade mode LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); @@ -258,15 +351,28 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { // clear old CF columnFamilies.clear(); - db = RocksDB.open( - dbOptions, - new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), - columnFamilyDescriptors, - columnFamilies); - - noTimestampColumnFamily = columnFamilies.get(0); - db.delete(noTimestampColumnFamily, "key7".getBytes()); - db.close(); + db = null; + noTimestampColumnFamily = null; + try { + db = RocksDB.open( + dbOptions, + new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), + columnFamilyDescriptors, + columnFamilies); + + noTimestampColumnFamily = columnFamilies.get(0); + db.delete(noTimestampColumnFamily, "key7".getBytes()); + } finally { + // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions + if (noTimestampColumnFamily != null) { + noTimestampColumnFamily.close(); + } + if (db != null) { + db.close(); + } + dbOptions.close(); + columnFamilyOptions.close(); + } // check that still in regular mode appender = LogCaptureAppender.createAndRegister(); @@ -277,17 +383,18 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { private void prepareOldStore() { final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME); - keyValueStore.init(context, keyValueStore); - - keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes()); - keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes()); - keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes()); - keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes()); - keyValueStore.put(new Bytes("key5".getBytes()), "55555".getBytes()); - keyValueStore.put(new Bytes("key6".getBytes()), "666666".getBytes()); - keyValueStore.put(new Bytes("key7".getBytes()), "7777777".getBytes()); - - keyValueStore.close(); + try { + keyValueStore.init(context, keyValueStore); + + keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes()); + keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes()); + keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes()); + keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes()); + keyValueStore.put(new Bytes("key5".getBytes()), "55555".getBytes()); + keyValueStore.put(new Bytes("key6".getBytes()), "666666".getBytes()); + keyValueStore.put(new Bytes("key7".getBytes()), "7777777".getBytes()); + } finally { + keyValueStore.close(); + } } - }