This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ea3eed MINOR: cleanup RocksDBStore tests (#8510)
6ea3eed is described below
commit 6ea3eedfd8c74d40157d6e489408d871e172ba34
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Sat Apr 18 13:01:16 2020 -0700
MINOR: cleanup RocksDBStore tests (#8510)
One of the new rocksdb unit tests creates a non-temporary rocksdb directory
wherever the test is run from, with some rocksdb files left behind after the
test(s) are done. We should use the tempDirectory dir for this testing
Reviewers: Guozhang Wang <[email protected]>
---
.../streams/state/internals/RocksDBStoreTest.java | 118 ++++++++-------------
.../internals/RocksDBTimestampedStoreTest.java | 4 -
2 files changed, 47 insertions(+), 75 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a67754d..b1ee48f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -74,8 +74,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
@@ -106,6 +106,11 @@ public class RocksDBStoreTest {
context.metrics().setRocksDBMetricsRecordingTrigger(new
RocksDBMetricsRecordingTrigger());
}
+ @After
+ public void tearDown() {
+ rocksDBStore.close();
+ }
+
RocksDBStore getRocksDBStore() {
return new RocksDBStore(DB_NAME, METRICS_SCOPE);
}
@@ -137,15 +142,10 @@ public class RocksDBStoreTest {
return getProcessorContext(streamsProps);
}
- @After
- public void tearDown() {
- rocksDBStore.close();
- }
-
@Test
public void
shouldAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsDebug() {
final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
getProcessorContext(RecordingLevel.DEBUG);
+ context = getProcessorContext(RecordingLevel.DEBUG);
reset(metricsRecorder);
metricsRecorder.addStatistics(
eq(DB_NAME),
@@ -153,46 +153,46 @@ public class RocksDBStoreTest {
);
replay(metricsRecorder);
- store.openDB(mockContext);
+ store.openDB(context);
verify(metricsRecorder);
}
@Test
public void
shouldNotAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsInfo() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
getProcessorContext(RecordingLevel.INFO);
+ rocksDBStore= getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.INFO);
reset(metricsRecorder);
replay(metricsRecorder);
- store.openDB(mockContext);
+ rocksDBStore.openDB(context);
verify(metricsRecorder);
}
@Test
public void
shouldRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsDebug()
{
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
getProcessorContext(RecordingLevel.DEBUG);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
metricsRecorder.removeStatistics(DB_NAME);
replay(metricsRecorder);
- store.close();
+ rocksDBStore.close();
verify(metricsRecorder);
}
@Test
public void
shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsInfo()
{
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
getProcessorContext(RecordingLevel.INFO);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.INFO);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
replay(metricsRecorder);
- store.close();
+ rocksDBStore.close();
verify(metricsRecorder);
}
@@ -211,26 +211,22 @@ public class RocksDBStoreTest {
@Test
public void
shouldNotAddStatisticsToInjectedMetricsRecorderWhenUserProvidesStatistics() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
- getProcessorContext(RecordingLevel.DEBUG,
RocksDBConfigSetterWithUserProvidedStatistics.class);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG,
RocksDBConfigSetterWithUserProvidedStatistics.class);
replay(metricsRecorder);
- store.openDB(mockContext);
+ rocksDBStore.openDB(context);
verify(metricsRecorder);
}
@Test
public void
shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserProvidesStatistics()
{
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
- getProcessorContext(RecordingLevel.DEBUG,
RocksDBConfigSetterWithUserProvidedStatistics.class);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG,
RocksDBConfigSetterWithUserProvidedStatistics.class);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
replay(metricsRecorder);
- store.close();
-
verify(metricsRecorder);
}
@@ -290,12 +286,7 @@ public class RocksDBStoreTest {
assertTrue(tmpDir.setReadOnly());
- try {
- rocksDBStore.openDB(tmpContext);
- fail("Should have thrown ProcessorStateException");
- } catch (final ProcessorStateException e) {
- // this is good, do nothing
- }
+ assertThrows(ProcessorStateException.class, () ->
rocksDBStore.openDB(tmpContext));
}
@Test
@@ -503,56 +494,41 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPut() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.put(null, stringSerializer.serialize(null,
"someVal"));
- fail("Should have thrown NullPointerException on null put()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.put(null, stringSerializer.serialize(null,
"someVal")));
}
@Test
public void shouldThrowNullPointerExceptionOnNullPutAll() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.put(null, stringSerializer.serialize(null,
"someVal"));
- fail("Should have thrown NullPointerException on null put()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.put(null, stringSerializer.serialize(null,
"someVal")));
}
@Test
public void shouldThrowNullPointerExceptionOnNullGet() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.get(null);
- fail("Should have thrown NullPointerException on null get()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.get(null));
}
@Test
public void shouldThrowNullPointerExceptionOnDelete() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.delete(null);
- fail("Should have thrown NullPointerException on deleting null
key");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.delete(null));
}
@Test
public void shouldThrowNullPointerExceptionOnRange() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.range(null, new
Bytes(stringSerializer.serialize(null, "2")));
- fail("Should have thrown NullPointerException on null range key");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.range(null, new
Bytes(stringSerializer.serialize(null, "2"))));
}
@Test(expected = ProcessorStateException.class)
@@ -567,10 +543,8 @@ public class RocksDBStoreTest {
@Test
public void shouldHandleToggleOfEnablingBloomFilters() {
-
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
TestingBloomFilterRocksDBConfigSetter.class);
- rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
@@ -616,20 +590,22 @@ public class RocksDBStoreTest {
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
+ final TaskId taskId = new TaskId(0, 0);
+
final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger =
new RocksDBMetricsRecordingTrigger();
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application",
StreamsConfig.METRICS_LATEST);
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
- final ProcessorContext<Object, Object> context =
EasyMock.niceMock(ProcessorContext.class);
+
+ context = EasyMock.niceMock(InternalMockProcessorContext.class);
EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
- final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.expect(context.appConfigs())
.andStubReturn(new
StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
- EasyMock.expect(context.taskId()).andStubReturn(taskId);
+ EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
- final RocksDBStore rocksDBStore = new RocksDBStore(DB_NAME,
METRICS_SCOPE);
+
rocksDBStore.init(context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 3ba2f1f..fdfb6a2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -71,13 +71,10 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
// re-open store
final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
- rocksDBStore = getRocksDBStore();
rocksDBStore.init(context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME
+ " in regular mode"));
LogCaptureAppender.unregister(appender);
- rocksDBStore.close();
-
// verify store
final DBOptions dbOptions = new DBOptions();
final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
@@ -201,7 +198,6 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
// approx: 0 entries on old CF, 3 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(3L));
-
iteratorsShouldNotMigrateData();
assertThat(rocksDBStore.approximateNumEntries(), is(3L));