cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672
##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws
Exception {
}, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
// Kill instance, delete state to force restoration.
- assertThat("Streams instance did not close within timeout",
streams.close(Duration.ofSeconds(60)));
+ assertThat("Streams instance did not close within timeout",
streams.get().close(Duration.ofSeconds(60)));
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(f -> assertTrue(f.delete(), "Some state " + f + "
could not be deleted"));
+ } finally {
+ streams.get().close();
Review Comment:
Look at line 284. There `streams` is used in a lambda expression. You can
only use variables that are final or effective final in a lambda expression.
There are two possibilities to make `streams` final: either we use a final
`AtomicReference` or we assign `streams` to an effective final variable and use
that in the lambda. I chose the `AtomicReference` option. This are the two
options because we need to initialize the variable outside of the `try`-clause
to have an initialized variable in the `finally`-clause.
However, I now realized that we can create the Streams client outside the
`try`-clause because if the constructor fails we do not need to call close and
cleanup in the `finally`-clause.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]