cadonna commented on code in PR #12959:
URL: https://github.com/apache/kafka/pull/12959#discussion_r1043244790
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java:
##########
@@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() {
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
);
+
+ stateStore.close();
Review Comment:
Why do you close the state store in the test here but not in the other tests?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method
method) throws Exception {
assertThat(undeclaredMockMethodCall.getCause(),
instanceOf(AssertionError.class));
assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
matchesPattern("Unexpected method call DBOptions\\." +
method.getName() + "((.*\n*)*):"));
+ } finally {
+ reset(mockedDbOptions);
+ mockedDbOptions.close();
+ replay(mockedDbOptions);
+ optionsFacadeDbOptions.close();
Review Comment:
I think it would be enough to just `optionsFacadeDbOptions.close();`. You
already verify that `optionsFacadeDbOptions.close();` calls
`mockedDbOptions.close();` in the try-block. No need to verify it again.
Alternatively, you could add `close` to the list of ignored methods
(`ignoreMethods`) and verify as you did. However, you need to add
`verify(mockedDbOptions)` after `optionsFacadeDbOptions.close()` otherwise
nothing is verified.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -333,23 +343,23 @@ public void shouldLogWarningWhenSettingWalOptions()
throws Exception {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class))
{
- final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter
adapter
- = new
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new
ColumnFamilyOptions());
-
- for (final Method method :
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
{
- if (walRelatedMethods.contains(method.getName())) {
- method.invoke(adapter,
getDBOptionsParameters(method.getParameterTypes()));
+ try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter
adapter =
+ new
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new
ColumnFamilyOptions())) {
+ for (final Method method :
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
{
+ if (walRelatedMethods.contains(method.getName())) {
+ method.invoke(adapter,
getDBOptionsParameters(method.getParameterTypes()));
+ }
}
- }
- final List<String> walOptions = Arrays.asList("walDir",
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB",
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
+ final List<String> walOptions = Arrays.asList("walDir",
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB",
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
- final Set<String> logMessages = appender.getEvents().stream()
- .filter(e -> e.getLevel().equals("WARN"))
- .map(LogCaptureAppender.Event::getMessage)
- .collect(Collectors.toSet());
+ final Set<String> logMessages = appender.getEvents().stream()
+ .filter(e -> e.getLevel().equals("WARN"))
+ .map(LogCaptureAppender.Event::getMessage)
+ .collect(Collectors.toSet());
- walOptions.forEach(option -> assertThat(logMessages,
hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB.
Setting option '%s' will be ignored", option))));
+ walOptions.forEach(option -> assertThat(logMessages,
hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB.
Setting option '%s' will be ignored", option))));
+ }
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java:
##########
@@ -480,6 +480,7 @@ public void shouldCreateWriteBatches() {
assertEquals(2, writeBatchMap.size());
for (final WriteBatch batch : writeBatchMap.values()) {
assertEquals(1, batch.count());
+ batch.close();
Review Comment:
Nice catch!
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -253,6 +258,11 @@ private void verifyColumnFamilyOptionsMethodCall(final
Method method) throws Exc
assertThat(undeclaredMockMethodCall.getCause(),
instanceOf(AssertionError.class));
assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
matchesPattern("Unexpected method call ColumnFamilyOptions\\."
+ method.getName() + "(.*)"));
+ } finally {
+ reset(mockedColumnFamilyOptions);
Review Comment:
See my comments above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]