This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new d5eda1e  KAFKA-8791: RocksDBTimestampedStore should open in regular 
mode by default (#7201)
d5eda1e is described below

commit d5eda1eac8314cc3107208c1098e1f89d41c2ed5
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Tue Aug 13 18:37:45 2019 -0700

    KAFKA-8791: RocksDBTimestampedStore should open in regular mode by default 
(#7201)
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Richard Yu <yohan.richard...@gmail.com>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../state/internals/RocksDBTimestampedStore.java   |  32 +-
 .../internals/RocksDBTimestampedStoreTest.java     | 341 ++++++++++++++-------
 2 files changed, 240 insertions(+), 133 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 74f0919..7a7adb4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -70,20 +70,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
 
         try {
             db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), 
columnFamilyDescriptors, columnFamilies);
-
-            final ColumnFamilyHandle noTimestampColumnFamily = 
columnFamilies.get(0);
-
-            final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily);
-            noTimestampsIter.seekToFirst();
-            if (noTimestampsIter.isValid()) {
-                log.info("Opening store {} in upgrade mode", name);
-                dbAccessor = new 
DualColumnFamilyAccessor(noTimestampColumnFamily, columnFamilies.get(1));
-            } else {
-                log.info("Opening store {} in regular mode", name);
-                dbAccessor = new 
SingleColumnFamilyAccessor(columnFamilies.get(1));
-                noTimestampColumnFamily.close();
-            }
-            noTimestampsIter.close();
+            setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
         } catch (final RocksDBException e) {
             if ("Column family not found: : 
keyValueWithTimestamp".equals(e.getMessage())) {
                 try {
@@ -92,14 +79,27 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
                 } catch (final RocksDBException fatal) {
                     throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), fatal);
                 }
-                log.info("Opening store {} in upgrade mode", name);
-                dbAccessor = new 
DualColumnFamilyAccessor(columnFamilies.get(0), columnFamilies.get(1));
+                setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
             } else {
                 throw new ProcessorStateException("Error opening store " + 
name + " at location " + dbDir.toString(), e);
             }
         }
     }
 
+    private void setDbAccessor(final ColumnFamilyHandle 
noTimestampColumnFamily,
+                               final ColumnFamilyHandle 
withTimestampColumnFamily) {
+        final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily);
+        noTimestampsIter.seekToFirst();
+        if (noTimestampsIter.isValid()) {
+            log.info("Opening store {} in upgrade mode", name);
+            dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
withTimestampColumnFamily);
+        } else {
+            log.info("Opening store {} in regular mode", name);
+            dbAccessor = new 
SingleColumnFamilyAccessor(withTimestampColumnFamily);
+            noTimestampColumnFamily.close();
+        }
+        noTimestampsIter.close();
+    }
 
 
     private class DualColumnFamilyAccessor implements RocksDBAccessor {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index f49527b..e7ad30d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -47,6 +47,79 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
     }
 
     @Test
+    public void shouldOpenNewStoreInRegularMode() {
+        
LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class);
+
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+        rocksDBStore.init(context, rocksDBStore);
+        assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME 
+ " in regular mode"));
+        LogCaptureAppender.unregister(appender);
+
+        try (final KeyValueIterator<Bytes, byte[]> iterator = 
rocksDBStore.all()) {
+            assertThat(iterator.hasNext(), is(false));
+        }
+    }
+
+    @Test
+    public void shouldOpenExistingStoreInRegularMode() throws Exception {
+        
LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class);
+
+        // prepare store
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.put(new Bytes("key".getBytes()), 
"timestamped".getBytes());
+        rocksDBStore.close();
+
+        // re-open store
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+        rocksDBStore = getRocksDBStore();
+        rocksDBStore.init(context, rocksDBStore);
+        assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME 
+ " in regular mode"));
+        LogCaptureAppender.unregister(appender);
+
+        rocksDBStore.close();
+
+        // verify store
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+
+        RocksDB db = null;
+        ColumnFamilyHandle noTimestampColumnFamily = null, 
withTimestampColumnFamily = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            noTimestampColumnFamily = columnFamilies.get(0);
+            withTimestampColumnFamily = columnFamilies.get(1);
+
+            assertThat(db.get(noTimestampColumnFamily, "key".getBytes()), new 
IsNull<>());
+            assertThat(db.getLongProperty(noTimestampColumnFamily, 
"rocksdb.estimate-num-keys"), is(0L));
+            assertThat(db.get(withTimestampColumnFamily, 
"key".getBytes()).length, is(11));
+            assertThat(db.getLongProperty(withTimestampColumnFamily, 
"rocksdb.estimate-num-keys"), is(1L));
+        } finally {
+            // Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
+            if (noTimestampColumnFamily != null) {
+                noTimestampColumnFamily.close();
+            }
+            if (withTimestampColumnFamily != null) {
+                withTimestampColumnFamily.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
     public void shouldMigrateDataFromDefaultToTimestampColumnFamily() throws 
Exception {
         prepareOldStore();
 
@@ -139,70 +212,70 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
 
     private void iteratorsShouldNotMigrateData() {
         // iterating should not migrate any data, but return all key over both 
CF (plus surrogate timestamps for old CF)
-        final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all();
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key1".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 1
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, 
keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key11".getBytes(), keyValue.key.get());
-            assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 
'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, 
keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key2".getBytes(), keyValue.key.get());
-            assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 
'm', 'p', '+', '2', '2'}, keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key4".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 4444
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', 
'4', '4', '4'}, keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key5".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 55555
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', 
'5', '5', '5', '5'}, keyValue.value);
+        try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all()) 
{
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key1".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 1
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'1'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key11".getBytes(), keyValue.key.get());
+                assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, 
keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get());
+                assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '2', '2'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 4444
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'4', '4', '4', '4'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 55555
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'5', '5', '5', '5', '5'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key7".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 7777777
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'7', '7', '7', '7', '7', '7', '7'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key8".getBytes(), keyValue.key.get());
+                assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
+            }
+            assertFalse(itAll.hasNext());
         }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key7".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 7777777
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', 
'7', '7', '7', '7', '7', '7'}, keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-            assertArrayEquals("key8".getBytes(), keyValue.key.get());
-            assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 
'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
-        }
-        assertFalse(itAll.hasNext());
-        itAll.close();
-
-        final KeyValueIterator<Bytes, byte[]> it =
-            rocksDBStore.range(new Bytes("key2".getBytes()), new 
Bytes("key5".getBytes()));
-        {
-            final KeyValue<Bytes, byte[]> keyValue = it.next();
-            assertArrayEquals("key2".getBytes(), keyValue.key.get());
-            assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 
'm', 'p', '+', '2', '2'}, keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = it.next();
-            assertArrayEquals("key4".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 4444
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', 
'4', '4', '4'}, keyValue.value);
-        }
-        {
-            final KeyValue<Bytes, byte[]> keyValue = it.next();
-            assertArrayEquals("key5".getBytes(), keyValue.key.get());
-            // unknown timestamp == -1 plus value == 55555
-            assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', 
'5', '5', '5', '5'}, keyValue.value);
+
+        try (final KeyValueIterator<Bytes, byte[]> it =
+                rocksDBStore.range(new Bytes("key2".getBytes()), new 
Bytes("key5".getBytes()))) {
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get());
+                assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '2', '2'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 4444
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'4', '4', '4', '4'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get());
+                // unknown timestamp == -1 plus value == 55555
+                assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, 
'5', '5', '5', '5', '5'}, keyValue.value);
+            }
+            assertFalse(it.hasNext());
         }
-        assertFalse(it.hasNext());
-        it.close();
     }
 
     private void verifyOldAndNewColumnFamily() throws Exception {
@@ -214,40 +287,60 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             new 
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
         final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
 
-        RocksDB db = RocksDB.open(
-            dbOptions,
-            new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
-            columnFamilyDescriptors,
-            columnFamilies);
-
-        ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0);
-        final ColumnFamilyHandle withTimestampColumnFamily = 
columnFamilies.get(1);
-
-        assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key7".getBytes()).length, 
is(7));
-        assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), new 
IsNull<>());
-        assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), new 
IsNull<>());
-
-        assertThat(db.get(withTimestampColumnFamily, "unknown".getBytes()), 
new IsNull<>());
-        assertThat(db.get(withTimestampColumnFamily, 
"key1".getBytes()).length, is(8 + 1));
-        assertThat(db.get(withTimestampColumnFamily, 
"key2".getBytes()).length, is(12));
-        assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), new 
IsNull<>());
-        assertThat(db.get(withTimestampColumnFamily, 
"key4".getBytes()).length, is(8 + 4));
-        assertThat(db.get(withTimestampColumnFamily, 
"key5".getBytes()).length, is(8 + 5));
-        assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), new 
IsNull<>());
-        assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), new 
IsNull<>());
-        assertThat(db.get(withTimestampColumnFamily, 
"key8".getBytes()).length, is(18));
-        assertThat(db.get(withTimestampColumnFamily, 
"key11".getBytes()).length, is(21));
-        assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new 
IsNull<>());
-
-        db.close();
+        RocksDB db = null;
+        ColumnFamilyHandle noTimestampColumnFamily = null, 
withTimestampColumnFamily = null;
+        boolean errorOccurred = false;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            noTimestampColumnFamily = columnFamilies.get(0);
+            withTimestampColumnFamily = columnFamilies.get(1);
+
+            assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, 
"key7".getBytes()).length, is(7));
+            assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new 
IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), 
new IsNull<>());
+
+            assertThat(db.get(withTimestampColumnFamily, 
"unknown".getBytes()), new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, 
"key1".getBytes()).length, is(8 + 1));
+            assertThat(db.get(withTimestampColumnFamily, 
"key2".getBytes()).length, is(12));
+            assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), 
new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, 
"key4".getBytes()).length, is(8 + 4));
+            assertThat(db.get(withTimestampColumnFamily, 
"key5".getBytes()).length, is(8 + 5));
+            assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), 
new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), 
new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, 
"key8".getBytes()).length, is(18));
+            assertThat(db.get(withTimestampColumnFamily, 
"key11".getBytes()).length, is(21));
+            assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), 
new IsNull<>());
+        } catch (final RuntimeException fatal) {
+            errorOccurred = true;
+        } finally {
+            // Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
+            if (noTimestampColumnFamily != null) {
+                noTimestampColumnFamily.close();
+            }
+            if (withTimestampColumnFamily != null) {
+                withTimestampColumnFamily.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            if (errorOccurred) {
+                dbOptions.close();
+                columnFamilyOptions.close();
+            }
+        }
 
         // check that still in upgrade mode
         LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
@@ -258,15 +351,28 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
 
         // clear old CF
         columnFamilies.clear();
-        db = RocksDB.open(
-            dbOptions,
-            new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
-            columnFamilyDescriptors,
-            columnFamilies);
-
-        noTimestampColumnFamily = columnFamilies.get(0);
-        db.delete(noTimestampColumnFamily, "key7".getBytes());
-        db.close();
+        db = null;
+        noTimestampColumnFamily = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            noTimestampColumnFamily = columnFamilies.get(0);
+            db.delete(noTimestampColumnFamily, "key7".getBytes());
+        } finally {
+            // Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
+            if (noTimestampColumnFamily != null) {
+                noTimestampColumnFamily.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
 
         // check that still in regular mode
         appender = LogCaptureAppender.createAndRegister();
@@ -277,17 +383,18 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
 
     private void prepareOldStore() {
         final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME);
-        keyValueStore.init(context, keyValueStore);
-
-        keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
-        keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
-        keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes());
-        keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
-        keyValueStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
-        keyValueStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
-        keyValueStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
-
-        keyValueStore.close();
+        try {
+            keyValueStore.init(context, keyValueStore);
+
+            keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
+            keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
+            keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes());
+            keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
+            keyValueStore.put(new Bytes("key5".getBytes()), 
"55555".getBytes());
+            keyValueStore.put(new Bytes("key6".getBytes()), 
"666666".getBytes());
+            keyValueStore.put(new Bytes("key7".getBytes()), 
"7777777".getBytes());
+        } finally {
+            keyValueStore.close();
+        }
     }
-
 }

Reply via email to