This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6ea3eed MINOR: cleanup RocksDBStore tests (#8510) 6ea3eed is described below commit 6ea3eedfd8c74d40157d6e489408d871e172ba34 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Sat Apr 18 13:01:16 2020 -0700 MINOR: cleanup RocksDBStore tests (#8510) One of the new rocksdb unit tests creates a non-temporary rocksdb directory wherever the test is run from, with some rocksdb files left behind after the test(s) are done. We should use the tempDirectory dir for this testing Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../streams/state/internals/RocksDBStoreTest.java | 118 ++++++++------------- .../internals/RocksDBTimestampedStoreTest.java | 4 - 2 files changed, 47 insertions(+), 75 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index a67754d..b1ee48f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -74,8 +74,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.powermock.api.easymock.PowerMock.replay; import static org.powermock.api.easymock.PowerMock.verify; @@ -106,6 +106,11 @@ public class RocksDBStoreTest { context.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger()); } + @After + public void tearDown() { + rocksDBStore.close(); + } + RocksDBStore getRocksDBStore() { return new RocksDBStore(DB_NAME, METRICS_SCOPE); } @@ -137,15 +142,10 @@ public class RocksDBStoreTest { return getProcessorContext(streamsProps); } - @After - public void tearDown() { - rocksDBStore.close(); - } - @Test public void shouldAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsDebug() { final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG); + context = getProcessorContext(RecordingLevel.DEBUG); reset(metricsRecorder); metricsRecorder.addStatistics( eq(DB_NAME), @@ -153,46 +153,46 @@ public class RocksDBStoreTest { ); replay(metricsRecorder); - store.openDB(mockContext); + store.openDB(context); verify(metricsRecorder); } @Test public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsInfo() { - final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO); + rocksDBStore= getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.INFO); reset(metricsRecorder); replay(metricsRecorder); - store.openDB(mockContext); + rocksDBStore.openDB(context); verify(metricsRecorder); } @Test public void shouldRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsDebug() { - final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG); - store.openDB(mockContext); + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG); + rocksDBStore.openDB(context); reset(metricsRecorder); metricsRecorder.removeStatistics(DB_NAME); replay(metricsRecorder); - store.close(); + rocksDBStore.close(); verify(metricsRecorder); } @Test public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsInfo() { - final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO); - store.openDB(mockContext); + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.INFO); + rocksDBStore.openDB(context); reset(metricsRecorder); replay(metricsRecorder); - store.close(); + rocksDBStore.close(); verify(metricsRecorder); } @@ -211,26 +211,22 @@ public class RocksDBStoreTest { @Test public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenUserProvidesStatistics() { - final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = - getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); replay(metricsRecorder); - store.openDB(mockContext); + rocksDBStore.openDB(context); verify(metricsRecorder); } @Test public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserProvidesStatistics() { - final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder(); - final InternalMockProcessorContext mockContext = - getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); - store.openDB(mockContext); + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); + rocksDBStore.openDB(context); reset(metricsRecorder); replay(metricsRecorder); - store.close(); - verify(metricsRecorder); } @@ -290,12 +286,7 @@ public class RocksDBStoreTest { assertTrue(tmpDir.setReadOnly()); - try { - rocksDBStore.openDB(tmpContext); - fail("Should have thrown ProcessorStateException"); - } catch (final ProcessorStateException e) { - // this is good, do nothing - } + assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext)); } @Test @@ -503,56 +494,41 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnNullPut() { rocksDBStore.init(context, rocksDBStore); - try { - rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); - fail("Should have thrown NullPointerException on null put()"); - } catch (final NullPointerException e) { - // this is good - } + assertThrows( + NullPointerException.class, + () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"))); } @Test public void shouldThrowNullPointerExceptionOnNullPutAll() { rocksDBStore.init(context, rocksDBStore); - try { - rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); - fail("Should have thrown NullPointerException on null put()"); - } catch (final NullPointerException e) { - // this is good - } + assertThrows( + NullPointerException.class, + () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"))); } @Test public void shouldThrowNullPointerExceptionOnNullGet() { rocksDBStore.init(context, rocksDBStore); - try { - rocksDBStore.get(null); - fail("Should have thrown NullPointerException on null get()"); - } catch (final NullPointerException e) { - // this is good - } + assertThrows( + NullPointerException.class, + () -> rocksDBStore.get(null)); } @Test public void shouldThrowNullPointerExceptionOnDelete() { rocksDBStore.init(context, rocksDBStore); - try { - rocksDBStore.delete(null); - fail("Should have thrown NullPointerException on deleting null key"); - } catch (final NullPointerException e) { - // this is good - } + assertThrows( + NullPointerException.class, + () -> rocksDBStore.delete(null)); } @Test public void shouldThrowNullPointerExceptionOnRange() { rocksDBStore.init(context, rocksDBStore); - try { - rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))); - fail("Should have thrown NullPointerException on null range key"); - } catch (final NullPointerException e) { - // this is good - } + assertThrows( + NullPointerException.class, + () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")))); } @Test(expected = ProcessorStateException.class) @@ -567,10 +543,8 @@ public class RocksDBStoreTest { @Test public void shouldHandleToggleOfEnablingBloomFilters() { - final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class); - rocksDBStore = getRocksDBStore(); dir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext(dir, Serdes.String(), @@ -616,20 +590,22 @@ public class RocksDBStoreTest { @Test public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() { + final TaskId taskId = new TaskId(0, 0); + final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(); final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST); streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger); - final ProcessorContext<Object, Object> context = EasyMock.niceMock(ProcessorContext.class); + + context = EasyMock.niceMock(InternalMockProcessorContext.class); EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics); - final TaskId taskId = new TaskId(0, 0); EasyMock.expect(context.taskId()).andStubReturn(taskId); EasyMock.expect(context.appConfigs()) .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); - EasyMock.expect(context.taskId()).andStubReturn(taskId); + EasyMock.expect(context.stateDir()).andStubReturn(dir); EasyMock.replay(context); - final RocksDBStore rocksDBStore = new RocksDBStore(DB_NAME, METRICS_SCOPE); + rocksDBStore.init(context, rocksDBStore); final byte[] key = "hello".getBytes(); final byte[] value = "world".getBytes(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index 3ba2f1f..fdfb6a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -71,13 +71,10 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { // re-open store final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - rocksDBStore = getRocksDBStore(); rocksDBStore.init(context, rocksDBStore); assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode")); LogCaptureAppender.unregister(appender); - rocksDBStore.close(); - // verify store final DBOptions dbOptions = new DBOptions(); final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); @@ -201,7 +198,6 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { // approx: 0 entries on old CF, 3 in new CF assertThat(rocksDBStore.approximateNumEntries(), is(3L)); - iteratorsShouldNotMigrateData(); assertThat(rocksDBStore.approximateNumEntries(), is(3L));