This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 988fa3f272d MINOR: Small cleanups in streams tests (#19446)
988fa3f272d is described below

commit 988fa3f272da6ab6c93ee01efbc865f74983a76a
Author: Dmitry Werner <grimekil...@gmail.com>
AuthorDate: Wed Apr 30 04:26:50 2025 +0500

    MINOR: Small cleanups in streams tests (#19446)
    
    - Fixed typos
    - Fixed IDEA code inspection warnings
    - Removed unused fields and methods
    
    Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi 
<frankvi...@apache.org>, Matthias J. Sax <matth...@confluent.io>
---
 .../apache/kafka/streams/AutoOffsetResetTest.java  |  2 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 25 +++++----
 .../apache/kafka/streams/StreamsBuilderTest.java   |  6 +--
 .../apache/kafka/streams/StreamsConfigTest.java    | 14 ++---
 .../kafka/streams/processor/ReadOnlyStoreTest.java |  2 +-
 .../internals/CopartitionedTopicsEnforcerTest.java |  6 +--
 .../internals/DefaultStateUpdaterTest.java         | 28 ++--------
 .../internals/GlobalStateManagerImplTest.java      | 60 +++++++++++-----------
 .../internals/GlobalStreamThreadTest.java          |  4 +-
 .../internals/InternalTopicManagerTest.java        | 22 ++++----
 .../internals/InternalTopologyBuilderTest.java     |  6 +--
 .../processor/internals/NamedTopologyTest.java     |  6 +--
 .../processor/internals/ProcessorMetadataTest.java |  4 +-
 .../processor/internals/ProcessorNodeTest.java     |  4 +-
 .../processor/internals/ReadOnlyTaskTest.java      |  4 +-
 .../internals/RepartitionOptimizingTest.java       |  4 --
 .../RepartitionWithMergeOptimizingTest.java        |  4 --
 .../processor/internals/StandbyTaskTest.java       |  2 +-
 .../processor/internals/StateDirectoryTest.java    |  3 --
 .../processor/internals/StateManagerUtilTest.java  |  2 +-
 .../internals/StoreChangelogReaderTest.java        | 10 ++--
 .../processor/internals/StreamTaskTest.java        |  6 +--
 .../processor/internals/StreamThreadTest.java      |  8 +--
 .../internals/StreamsPartitionAssignorTest.java    | 18 -------
 .../processor/internals/StreamsProducerTest.java   |  6 +--
 .../processor/internals/TaskManagerTest.java       |  4 +-
 .../internals/assignment/AssignmentTestUtils.java  |  2 +-
 .../assignment/RackAwareTaskAssignorTest.java      | 14 ++---
 .../assignment/TaskAssignmentUtilsTest.java        | 12 ++---
 .../assignment/TaskAssignorConvergenceTest.java    |  8 +--
 .../internals/metrics/StreamsMetricsImplTest.java  | 60 +++++++++++-----------
 .../internals/AbstractRocksDBWindowStoreTest.java  |  2 +-
 .../CachingPersistentSessionStoreTest.java         | 10 ++--
 .../state/internals/FilteredCacheIteratorTest.java | 16 ++----
 .../internals/MonotonicProcessorRecordContext.java |  6 ---
 .../kafka/streams/state/internals/Murmur3Test.java | 10 ++--
 .../state/internals/ReadOnlyWindowStoreStub.java   | 12 ++---
 ...sToDbOptionsColumnFamilyOptionsAdapterTest.java |  4 +-
 .../state/internals/WindowStoreFetchTest.java      |  3 --
 .../state/internals/WrappingStoreProviderTest.java |  6 +--
 .../kafka/streams/tests/RelationalSmokeTest.java   |  8 +--
 .../kafka/streams/tests/ShutdownDeadlockTest.java  |  8 +--
 .../kafka/streams/tests/SmokeTestClient.java       |  2 +-
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  2 +-
 .../tests/StreamsBrokerDownResilienceTest.java     |  2 +-
 .../streams/tests/StreamsNamedRepartitionTest.java |  2 +-
 .../kafka/test/MockInternalTopicManager.java       |  2 +-
 .../org/apache/kafka/test/MockValueJoiner.java     |  7 +--
 .../test/NoOpValueTransformerWithKeySupplier.java  |  2 +-
 .../kafka/test/ReadOnlySessionStoreStub.java       |  4 +-
 50 files changed, 190 insertions(+), 274 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java 
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
index fb4d9738c93..0ef63c0f857 100644
--- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -41,7 +41,7 @@ class AutoOffsetResetTest {
     }
 
     @Test
-    void shouldThrowExceptionOnDurationForLastetReset() {
+    void shouldThrowExceptionOnDurationForLatestReset() {
         final AutoOffsetResetInternal latest = new 
AutoOffsetResetInternal(AutoOffsetReset.latest());
         assertThrows(IllegalStateException.class, latest::duration, "Latest 
should not have a duration.");
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1516dfb5eda..499498224bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsNotStartedException;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
@@ -345,7 +344,7 @@ public class KafkaStreamsTest {
         }).when(thread).start();
     }
 
-    private CountDownLatch terminableThreadBlockingLatch = new 
CountDownLatch(1);
+    private final CountDownLatch terminableThreadBlockingLatch = new 
CountDownLatch(1);
 
     private void prepareTerminableThread(final StreamThread thread) throws 
InterruptedException {
         doAnswer(invocation -> {
@@ -561,7 +560,7 @@ public class KafkaStreamsTest {
 
         try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
props, supplier, time)) {
             assertEquals(NUM_THREADS, streams.threads.size());
-            assertEquals(streams.state(), KafkaStreams.State.CREATED);
+            assertEquals(KafkaStreams.State.CREATED, streams.state());
 
             streams.start();
             waitForCondition(
@@ -620,7 +619,7 @@ public class KafkaStreamsTest {
             );
 
             streams.close();
-            assertEquals(streams.state(), KafkaStreams.State.ERROR, 
"KafkaStreams should remain in ERROR state after close.");
+            assertEquals(KafkaStreams.State.ERROR, streams.state(), 
"KafkaStreams should remain in ERROR state after close.");
             assertThat(appender.getMessages(), hasItem(containsString("State 
transition from RUNNING to PENDING_ERROR")));
             assertThat(appender.getMessages(), hasItem(containsString("State 
transition from PENDING_ERROR to ERROR")));
             assertThat(appender.getMessages(), hasItem(containsString("Streams 
client is already in the terminal ERROR state")));
@@ -644,7 +643,7 @@ public class KafkaStreamsTest {
             streams.start();
             final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
             streams.close();
-            assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+            assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
             assertEquals(oldCloseCount + initDiff, 
MockMetricsReporter.CLOSE_COUNT.get());
         }
     }
@@ -875,7 +874,7 @@ public class KafkaStreamsTest {
         prepareThreadState(streamThreadTwo, state2);
         try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+            assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler(null));
         }
     }
 
@@ -886,7 +885,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+            assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler(null));
         }
 
     }
@@ -896,7 +895,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadOne, 1);
         prepareStreamThread(streamThreadTwo, 2);
         try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
-            assertThrows(NullPointerException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+            assertThrows(NullPointerException.class, () -> 
streams.setUncaughtExceptionHandler(null));
         }
     }
 
@@ -1369,7 +1368,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void shouldGetClientSupplierFromConfigForConstructorWithTime() 
throws Exception {
+    public void shouldGetClientSupplierFromConfigForConstructorWithTime() {
         prepareStreams();
         final AtomicReference<StreamThread.State> state1 = 
prepareStreamThread(streamThreadOne, 1);
         final AtomicReference<StreamThread.State> state2 = 
prepareStreamThread(streamThreadTwo, 2);
@@ -1548,7 +1547,7 @@ public class KafkaStreamsTest {
         try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
props, supplier, time)) {
 
             assertThat(streams.threads.size(), equalTo(0));
-            assertEquals(streams.state(), KafkaStreams.State.CREATED);
+            assertEquals(KafkaStreams.State.CREATED, streams.state());
 
             streams.start();
             waitForCondition(
@@ -1796,7 +1795,7 @@ public class KafkaStreamsTest {
         final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false);
 
         when(streamThreadOne.clientInstanceIds(any()))
-            .thenReturn(Collections.singletonMap("any-client-1", new 
KafkaFutureImpl<Uuid>() {
+            .thenReturn(Collections.singletonMap("any-client-1", new 
KafkaFutureImpl<>() {
                 @Override
                 public Uuid get(final long timeout, final TimeUnit timeUnit) {
                     didAssertThreadOne.set(true);
@@ -1806,7 +1805,7 @@ public class KafkaStreamsTest {
                 }
             }));
         when(streamThreadTwo.clientInstanceIds(any()))
-            .thenReturn(Collections.singletonMap("any-client-2", new 
KafkaFutureImpl<Uuid>() {
+            .thenReturn(Collections.singletonMap("any-client-2", new 
KafkaFutureImpl<>() {
                 @Override
                 public Uuid get(final long timeout, final TimeUnit timeUnit) {
                     didAssertThreadTwo.set(true);
@@ -1823,7 +1822,7 @@ public class KafkaStreamsTest {
             streams.start();
 
             
when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
-                .thenReturn(new KafkaFutureImpl<Uuid>() {
+                .thenReturn(new KafkaFutureImpl<>() {
                     @Override
                     public Uuid get(final long timeout, final TimeUnit 
timeUnit) {
                         didAssertGlobalThread.set(true);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index a6c79806319..b839df53167 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -145,7 +145,7 @@ public class StreamsBuilderTest {
             ),
             "topic",
             Consumed.with(Serdes.String(), Serdes.String()),
-            () -> new Processor<String, String, Void, Void>() {
+            () -> new Processor<>() {
                 private KeyValueStore<String, String> store;
 
                 @Override
@@ -454,7 +454,7 @@ public class StreamsBuilderTest {
 
         builder.stream(topic)
                 .groupByKey()
-                .count(Materialized.<Object, Long, KeyValueStore<Bytes, 
byte[]>>as("store"))
+                .count(Materialized.as("store"))
                 .toStream();
 
         builder.build();
@@ -474,7 +474,7 @@ public class StreamsBuilderTest {
 
         builder.stream(topic)
                 .groupByKey()
-                .count(Materialized.<Object, Long, KeyValueStore<Bytes, 
byte[]>>as("store"))
+                .count(Materialized.as("store"))
                 .toStream();
 
         builder.build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a7f657ec54b..f081a768815 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -855,7 +855,7 @@ public class StreamsConfigTest {
 
         try {
             new StreamsConfig(props).getProducerConfigs(clientId);
-            fail("Should throw ConfigException when EOS is enabled and 
maxInFlight cannot be paresed into an integer");
+            fail("Should throw ConfigException when EOS is enabled and 
maxInFlight cannot be parsed into an integer");
         } catch (final ConfigException e) {
             assertEquals(
                 "Invalid value not-a-number for configuration 
max.in.flight.requests.per.connection:" +
@@ -875,8 +875,8 @@ public class StreamsConfigTest {
     @Test
     public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
         final String expectedOptimizeConfig = "none";
-        final String actualOptimizedConifig = 
streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
-        assertEquals(expectedOptimizeConfig, actualOptimizedConifig, 
"Optimization should be \"none\"");
+        final String actualOptimizedConfig = 
streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
+        assertEquals(expectedOptimizeConfig, actualOptimizedConfig, 
"Optimization should be \"none\"");
     }
 
     @Test
@@ -884,8 +884,8 @@ public class StreamsConfigTest {
         final String expectedOptimizeConfig = "all";
         props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
         final StreamsConfig config = new StreamsConfig(props);
-        final String actualOptimizedConifig = 
config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
-        assertEquals(expectedOptimizeConfig, actualOptimizedConifig, 
"Optimization should be \"all\"");
+        final String actualOptimizedConfig = 
config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
+        assertEquals(expectedOptimizeConfig, actualOptimizedConfig, 
"Optimization should be \"all\"");
     }
 
     @Test
@@ -1216,13 +1216,13 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldtSetMinTrafficRackAwareAssignmentConfig() {
+    public void shouldSetMinTrafficRackAwareAssignmentConfig() {
         props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
         assertEquals("min_traffic", new 
StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
     }
 
     @Test
-    public void shouldtSetBalanceSubtopologyRackAwareAssignmentConfig() {
+    public void shouldSetBalanceSubtopologyRackAwareAssignmentConfig() {
         props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY);
         assertEquals("balance_subtopology", new 
StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java
index 469dfe04f2b..3b639339902 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java
@@ -57,7 +57,7 @@ public class ReadOnlyStoreTest {
             new StringDeserializer(),
             "storeTopic",
             "readOnlyProcessor",
-            () -> new Processor<Integer, String, Void, Void>() {
+            () -> new Processor<>() {
                 KeyValueStore<Integer, String> store;
 
                 @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java
index 42fcc363398..82ff3cea85d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java
@@ -124,8 +124,8 @@ public class CopartitionedTopicsEnforcerTest {
         );
 
         final TreeMap<String, Integer> sorted = new TreeMap<>(
-            Utils.mkMap(Utils.mkEntry(topic1.name(), 
topic1.numberOfPartitions().get()),
-                        Utils.mkEntry(topic2.name(), 
topic2.numberOfPartitions().get()))
+            Utils.mkMap(Utils.mkEntry(topic1.name(), 
topic1.numberOfPartitions().orElseThrow()),
+                        Utils.mkEntry(topic2.name(), 
topic2.numberOfPartitions().orElseThrow()))
         );
 
         assertEquals(String.format("Invalid topology: thread " +
@@ -161,7 +161,7 @@ public class CopartitionedTopicsEnforcerTest {
         assertEquals(String.format("Invalid topology: thread Number of 
partitions [%s] " +
                                    "of repartition topic [%s] " +
                                    "doesn't match number of partitions [%s] of 
the source topic.",
-                                   topic1.numberOfPartitions().get(), 
topic1.name(), 2), ex.getMessage());
+                                   topic1.numberOfPartitions().orElseThrow(), 
topic1.name(), 2), ex.getMessage());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index adc71ebc116..abb128698a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -203,7 +203,7 @@ class DefaultStateUpdaterTest {
         verifyRestoredActiveTasks(restoredTask);
         stateUpdater.shutdown(Duration.ofMinutes(1));
 
-        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, () -> stateUpdater.start());
+        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, stateUpdater::start);
 
         assertEquals("State updater started with non-empty output queues."
             + " This indicates a bug. Please report at 
https://issues.apache.org/jira/projects/KAFKA/issues or to the"
@@ -220,7 +220,7 @@ class DefaultStateUpdaterTest {
         verifyExceptionsAndFailedTasks(new 
ExceptionAndTask(taskCorruptedException, failedTask));
         stateUpdater.shutdown(Duration.ofMinutes(1));
 
-        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, () -> stateUpdater.start());
+        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, stateUpdater::start);
 
         assertEquals("State updater started with non-empty output queues."
             + " This indicates a bug. Please report at 
https://issues.apache.org/jira/projects/KAFKA/issues or to the"
@@ -1785,7 +1785,7 @@ class DefaultStateUpdaterTest {
 
     private void verifyIdle() throws Exception {
         waitForCondition(
-            () -> stateUpdater.isIdle(),
+            stateUpdater::isIdle,
             VERIFICATION_TIMEOUT,
             "State updater did not enter an idling state!"
         );
@@ -1864,26 +1864,4 @@ class DefaultStateUpdaterTest {
         assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
         assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
     }
-    
-    private void verifyRemovedTasks(final Task... tasks) throws Exception {
-        if (tasks.length == 0) {
-            waitForCondition(
-                () -> stateUpdater.removedTasks().isEmpty(),
-                VERIFICATION_TIMEOUT,
-                "Did not get empty removed task within the given timeout!"
-            );
-        } else {
-            final Set<Task> expectedRemovedTasks = Set.of(tasks);
-            final Set<Task> removedTasks = new HashSet<>();
-            waitForCondition(
-                () -> {
-                    removedTasks.addAll(stateUpdater.removedTasks());
-                    return removedTasks.containsAll(expectedRemovedTasks)
-                        && removedTasks.size() == expectedRemovedTasks.size();
-                },
-                VERIFICATION_TIMEOUT,
-                "Did not get all removed task within the given timeout!"
-            );
-        }
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 758f8cd500f..74705840de9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -110,7 +109,6 @@ public class GlobalStateManagerImplTest {
     private ProcessorTopology topology;
     private InternalMockProcessorContext processorContext;
     private Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> 
optionalMockReprocessFactory;
-    private DeserializationExceptionHandler deserializationExceptionHandler;
 
     static ProcessorTopology withGlobalStores(final List<StateStore> 
stateStores,
                                               final Map<String, String> 
storeToChangelogTopic,
@@ -307,7 +305,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize();
         stateManager.registerStore(
-            new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, 
Object>(store1) {
+            new WrappedStateStore<>(store1) {
             },
             stateRestoreCallback,
                 null);
@@ -335,7 +333,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize();
         stateManager.registerStore(
-            new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, 
Object>(store2) {
+            new WrappedStateStore<>(store2) {
             },
             stateRestoreCallback,
             null);
@@ -424,7 +422,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         // register the stores
         initializeConsumer(1, 0, t1);
-        stateManager.registerStore(new NoOpReadOnlyStore<Object, 
Object>(store1.name()) {
+        stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
             @Override
             public void flush() {
                 throw new RuntimeException("KABOOM!");
@@ -434,7 +432,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldCloseStateStores() throws IOException {
+    public void shouldCloseStateStores() {
         stateManager.initialize();
         // register the stores
         initializeConsumer(1, 0, t1);
@@ -451,7 +449,7 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
-        stateManager.registerStore(new NoOpReadOnlyStore<Object, 
Object>(store1.name()) {
+        stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
@@ -476,7 +474,7 @@ public class GlobalStateManagerImplTest {
     public void shouldNotCloseStoresIfCloseAlreadyCalled() {
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
-        stateManager.registerStore(new NoOpReadOnlyStore<Object, 
Object>("t1-store") {
+        stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") {
             @Override
             public void close() {
                 if (!isOpen()) {
@@ -494,7 +492,7 @@ public class GlobalStateManagerImplTest {
     public void shouldAttemptToCloseAllStoresEvenWhenSomeException() {
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
-        final NoOpReadOnlyStore<Object, Object> store = new 
NoOpReadOnlyStore<Object, Object>("t1-store") {
+        final NoOpReadOnlyStore<Object, Object> store = new 
NoOpReadOnlyStore<>("t1-store") {
             @Override
             public void close() {
                 super.close();
@@ -598,7 +596,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void 
shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
                 numberOfCalls.incrementAndGet();
@@ -634,13 +632,13 @@ public class GlobalStateManagerImplTest {
         assertThat(cause, instanceOf(TimeoutException.class));
         assertThat(cause.getMessage(), equalTo("KABOOM!"));
 
-        assertEquals(numberOfCalls.get(), 1);
+        assertEquals(1, numberOfCalls.get());
     }
 
     @Test
     public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
                 time.sleep(100L);
@@ -675,13 +673,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
 
-        assertEquals(numberOfCalls.get(), 2);
+        assertEquals(2, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
                 time.sleep(100L);
@@ -716,13 +714,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 1000 ms. Adjust `task.timeout.ms` if 
needed."));
 
-        assertEquals(numberOfCalls.get(), 11);
+        assertEquals(11, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldNotFailOnSlowProgressWhenEndOffsetsThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
                 time.sleep(1L);
@@ -764,7 +762,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void 
shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 numberOfCalls.incrementAndGet();
@@ -800,13 +798,13 @@ public class GlobalStateManagerImplTest {
         assertThat(cause, instanceOf(TimeoutException.class));
         assertThat(cause.getMessage(), equalTo("KABOOM!"));
 
-        assertEquals(numberOfCalls.get(), 1);
+        assertEquals(1, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 time.sleep(100L);
@@ -841,13 +839,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
 
-        assertEquals(numberOfCalls.get(), 2);
+        assertEquals(2, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 time.sleep(100L);
@@ -882,13 +880,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 1000 ms. Adjust `task.timeout.ms` if 
needed."));
 
-        assertEquals(numberOfCalls.get(), 11);
+        assertEquals(11, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 time.sleep(1L);
@@ -930,7 +928,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void 
shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized long position(final TopicPartition partition) {
                 numberOfCalls.incrementAndGet();
@@ -966,13 +964,13 @@ public class GlobalStateManagerImplTest {
         assertThat(cause, instanceOf(TimeoutException.class));
         assertThat(cause.getMessage(), equalTo("KABOOM!"));
 
-        assertEquals(numberOfCalls.get(), 1);
+        assertEquals(1, numberOfCalls.get());
     }
 
     @Test
     public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized long position(final TopicPartition partition) {
                 time.sleep(100L);
@@ -1007,13 +1005,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
 
-        assertEquals(numberOfCalls.get(), 2);
+        assertEquals(2, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized long position(final TopicPartition partition) {
                 time.sleep(100L);
@@ -1048,13 +1046,13 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 1000 ms. Adjust `task.timeout.ms` if 
needed."));
 
-        assertEquals(numberOfCalls.get(), 11);
+        assertEquals(11, numberOfCalls.get());
     }
 
     @Test
     public void 
shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized long position(final TopicPartition partition) {
                 time.sleep(1L);
@@ -1090,7 +1088,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void 
shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore()
 {
-        consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) 
{
             @Override
             public synchronized ConsumerRecords<byte[], byte[]> poll(final 
Duration timeout) {
                 time.sleep(timeout.toMillis());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index e4f78c900d1..419d6b01013 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -103,7 +103,7 @@ public class GlobalStreamThreadTest {
             );
 
         final ProcessorSupplier<Object, Object, Void, Void> processorSupplier 
= () ->
-            new ContextualProcessor<Object, Object, Void, Void>() {
+            new ContextualProcessor<>() {
                 @Override
                 public void process(final Record<Object, Object> record) {
                 }
@@ -163,7 +163,7 @@ public class GlobalStreamThreadTest {
 
     @Test
     public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() 
throws Exception {
-        final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 throw new RuntimeException("KABOOM!");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 14470db2efa..c6d4cf4d239 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -96,7 +96,7 @@ import static org.mockito.Mockito.when;
 public class InternalTopicManagerTest {
     private final Node broker1 = new Node(0, "dummyHost-1", 1234);
     private final Node broker2 = new Node(1, "dummyHost-2", 1234);
-    private final List<Node> cluster = new ArrayList<Node>(2) {
+    private final List<Node> cluster = new ArrayList<>(2) {
         {
             add(broker1);
             add(broker2);
@@ -115,7 +115,7 @@ public class InternalTopicManagerTest {
     private InternalTopicManager internalTopicManager;
     private final MockTime time = new MockTime(0);
 
-    private final Map<String, Object> config = new HashMap<String, Object>() {
+    private final Map<String, Object> config = new HashMap<>() {
         {
             put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
             put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + 
broker1.port());
@@ -361,8 +361,8 @@ public class InternalTopicManagerTest {
         final InternalTopicManager internalTopicManager =
                 new InternalTopicManager(time, mockAdminClient, new 
StreamsConfig(config));
         try {
-            final Set<String> topic1set = new 
HashSet<String>(Collections.singletonList(topic1));
-            final Set<String> topic2set = new 
HashSet<String>(Collections.singletonList(topic2));
+            final Set<String> topic1set = new 
HashSet<>(Collections.singletonList(topic1));
+            final Set<String> topic2set = new 
HashSet<>(Collections.singletonList(topic2));
 
             internalTopicManager.getNumPartitions(topic1set, topic2set);
 
@@ -373,8 +373,8 @@ public class InternalTopicManagerTest {
         mockAdminClient.timeoutNextRequest(1);
 
         try {
-            final Set<String> topic1set = new 
HashSet<String>(Collections.singletonList(topic1));
-            final Set<String> topic2set = new 
HashSet<String>(Collections.singletonList(topic2));
+            final Set<String> topic1set = new 
HashSet<>(Collections.singletonList(topic1));
+            final Set<String> topic2set = new 
HashSet<>(Collections.singletonList(topic2));
 
             internalTopicManager.getNumPartitions(topic1set, topic2set);
 
@@ -710,22 +710,22 @@ public class InternalTopicManagerTest {
         internalTopicManager.makeReady(Collections.singletonMap(topic4, 
topicConfig4));
 
         assertEquals(Set.of(topic1, topic2, topic3, topic4), 
mockAdminClient.listTopics().names().get());
-        assertEquals(new TopicDescription(topic1, false, new 
ArrayList<TopicPartitionInfo>() {
+        assertEquals(new TopicDescription(topic1, false, new ArrayList<>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
             }
         }), 
mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
-        assertEquals(new TopicDescription(topic2, false, new 
ArrayList<TopicPartitionInfo>() {
+        assertEquals(new TopicDescription(topic2, false, new ArrayList<>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
             }
         }), 
mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
-        assertEquals(new TopicDescription(topic3, false, new 
ArrayList<TopicPartitionInfo>() {
+        assertEquals(new TopicDescription(topic3, false, new ArrayList<>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
             }
         }), 
mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
-        assertEquals(new TopicDescription(topic4, false, new 
ArrayList<TopicPartitionInfo>() {
+        assertEquals(new TopicDescription(topic4, false, new ArrayList<>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
             }
@@ -804,7 +804,7 @@ public class InternalTopicManagerTest {
         mockAdminClient.addTopic(
             false,
             topic1,
-            new ArrayList<TopicPartitionInfo>() {
+            new ArrayList<>() {
                 {
                     add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList()));
                     add(new TopicPartitionInfo(1, broker1, singleReplica, 
Collections.emptyList()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 366aa6636d0..86264b59bf7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -116,7 +116,7 @@ public class InternalTopologyBuilderTest {
         assertThat(builder.offsetResetStrategy(earliestTopic), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
         assertThat(builder.offsetResetStrategy(latestTopic), 
equalTo(AutoOffsetResetStrategy.LATEST));
         assertThat(builder.offsetResetStrategy(durationTopic).type(), 
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
-        
assertThat(builder.offsetResetStrategy(durationTopic).duration().get().toSeconds(),
 equalTo(42L));
+        
assertThat(builder.offsetResetStrategy(durationTopic).duration().orElseThrow().toSeconds(),
 equalTo(42L));
     }
 
     @Test
@@ -127,7 +127,7 @@ public class InternalTopologyBuilderTest {
         final String durationTopicPattern = "duration.*Topic";
 
         builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), 
"source0", null, null, null, Pattern.compile(noneTopicPattern));
-        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "sourc1", null, null, 
null, Pattern.compile(earliestTopicPattern));
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source1", null, null, 
null, Pattern.compile(earliestTopicPattern));
         builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, 
 Pattern.compile(latestTopicPattern));
         builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), 
"source3", null, null, null, Pattern.compile(durationTopicPattern));
 
@@ -137,7 +137,7 @@ public class InternalTopologyBuilderTest {
         assertThat(builder.offsetResetStrategy("earliestTestTopic"), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
         assertThat(builder.offsetResetStrategy("latestTestTopic"), 
equalTo(AutoOffsetResetStrategy.LATEST));
         assertThat(builder.offsetResetStrategy("durationTestTopic").type(), 
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
-        
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().get().toSeconds(),
 equalTo(42L));
+        
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().orElseThrow().toSeconds(),
 equalTo(42L));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
index 4c1110bac03..43a59b7057c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
@@ -107,9 +107,9 @@ public class NamedTopologyTest {
         final NamedTopology topology2 = builder2.build();
         final NamedTopology topology3 = builder3.build();
         streams.start(asList(topology1, topology2, topology3));
-        assertThat(streams.getTopologyByName("topology-1").get(), 
equalTo(topology1));
-        assertThat(streams.getTopologyByName("topology-2").get(), 
equalTo(topology2));
-        assertThat(streams.getTopologyByName("topology-3").get(), 
equalTo(topology3));
+        assertThat(streams.getTopologyByName("topology-1").orElseThrow(), 
equalTo(topology1));
+        assertThat(streams.getTopologyByName("topology-2").orElseThrow(), 
equalTo(topology2));
+        assertThat(streams.getTopologyByName("topology-3").orElseThrow(), 
equalTo(topology3));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java
index b3810092e1e..edd276e850b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ProcessorMetadataTest {
 
     @Test
-    public void shouldAddandGetKeyValueWithEmptyConstructor() {
+    public void shouldAddAndGetKeyValueWithEmptyConstructor() {
         final ProcessorMetadata metadata = new ProcessorMetadata();
         final String key = "some_key";
         final long value = 100L;
@@ -46,7 +46,7 @@ public class ProcessorMetadataTest {
     }
 
     @Test
-    public void shouldAddandGetKeyValueWithExistingMeta() {
+    public void shouldAddAndGetKeyValueWithExistingMeta() {
         final Map<String, Long> map = new HashMap<>();
         map.put("key1", 1L);
         map.put("key2", 2L);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 5b4303a1695..86f617e7f34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -106,7 +106,7 @@ public class ProcessorNodeTest {
         final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
             () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
 
-        assertTrue(failedProcessingException.getCause() instanceof 
RuntimeException);
+        assertInstanceOf(RuntimeException.class, 
failedProcessingException.getCause());
         assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
             failedProcessingException.getCause().getMessage());
         assertEquals(NAME, 
failedProcessingException.failedProcessorNodeName());
@@ -310,7 +310,7 @@ public class ProcessorNodeTest {
             StreamsException.class,
             () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))
         );
-        assertTrue(se.getCause() instanceof ClassCastException);
+        assertInstanceOf(ClassCastException.class, se.getCause());
         assertTrue(se.getMessage().contains("default Serdes"));
         assertTrue(se.getMessage().contains("input types"));
         assertTrue(se.getMessage().contains("pname"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index 896e84baf5b..c95cf8c5a56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -41,7 +41,7 @@ import static org.mockito.Mockito.verify;
 
 class ReadOnlyTaskTest {
 
-    private final List<String> readOnlyMethods = new LinkedList<String>() {
+    private final List<String> readOnlyMethods = new LinkedList<>() {
         {
             add("needsInitializationOrRestoration");
             add("inputPartitions");
@@ -56,7 +56,7 @@ class ReadOnlyTaskTest {
         }
     };
 
-    private final List<String> objectMethods = new LinkedList<String>() {
+    private final List<String> objectMethods = new LinkedList<>() {
         {
             add("wait");
             add("equals");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 91b95b87a7f..afb89fa3c8f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -50,8 +50,6 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -72,8 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @SuppressWarnings("deprecation")
 public class RepartitionOptimizingTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(RepartitionOptimizingTest.class);
-
     private static final String INPUT_TOPIC = "input";
     private static final String COUNT_TOPIC = "outputTopic_0";
     private static final String AGGREGATION_TOPIC = "outputTopic_1";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index a9287fc3e59..e62986f4098 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -42,8 +42,6 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,8 +58,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class RepartitionWithMergeOptimizingTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class);
-
     private static final String INPUT_A_TOPIC = "inputA";
     private static final String INPUT_B_TOPIC = "inputB";
     private static final String COUNT_TOPIC = "outputTopic_0";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 768f3787d0b..6be27187f0f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -178,7 +178,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws 
IOException {
+    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
         stateDirectory = mock(StateDirectory.class);
         when(stateDirectory.lock(taskId)).thenReturn(false);
         when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 8e7ae80af35..f850093d389 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -41,8 +41,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -95,7 +93,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class StateDirectoryTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(StateDirectoryTest.class);
     private final MockTime time = new MockTime();
     private File stateDir;
     private final String applicationId = "applicationId";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 4bb8ca5b2da..5d0acf46bd5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -184,7 +184,7 @@ public class StateManagerUtilTest {
         doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
         when(stateManager.baseDir()).thenReturn(randomFile);
 
-        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+        try (MockedStatic<Utils> ignored = mockStatic(Utils.class)) {
             assertThrows(ProcessorStateException.class, () ->
                     StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 320494c5348..69655b4d642 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -389,7 +389,7 @@ public class StoreChangelogReaderTest {
 
             adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
 
-            final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+            final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
                 @Override
                 public long position(final TopicPartition partition) {
                     throw new TimeoutException("KABOOM!");
@@ -674,7 +674,7 @@ public class StoreChangelogReaderTest {
         when(activeStateManager.taskId()).thenReturn(taskId);
 
         final AtomicBoolean clearException = new AtomicBoolean(false);
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public long position(final TopicPartition partition) {
                 if (clearException.get()) {
@@ -720,7 +720,7 @@ public class StoreChangelogReaderTest {
         when(activeStateManager.taskId()).thenReturn(taskId);
         when(storeMetadata.offset()).thenReturn(10L);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public long position(final TopicPartition partition) {
                 throw kaboom;
@@ -770,7 +770,7 @@ public class StoreChangelogReaderTest {
         };
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions) {
                 throw new AssertionError("Should not trigger this function");
@@ -928,7 +928,7 @@ public class StoreChangelogReaderTest {
 
     @Test
     public void shouldThrowIfUnsubscribeFail() {
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public void unsubscribe() {
                 throw kaboom;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 98807cd6342..fda9afa9a88 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -766,7 +766,7 @@ public class StreamTaskTest {
         metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 
         // Create a processor that only forwards even keys to test the metrics 
at the source and terminal nodes
-        final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = 
new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+        final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = 
new MockSourceNode<>(intDeserializer, intDeserializer) {
             InternalProcessorContext<Integer, Integer> context;
 
             @Override
@@ -2045,7 +2045,7 @@ public class StreamTaskTest {
     public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final Consumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions) {
                 throw new KafkaException("KABOOM!");
@@ -3109,7 +3109,7 @@ public class StreamTaskTest {
             singletonList(stateStore),
             emptyMap());
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
+        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
             @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions) {
                 throw new TimeoutException("KABOOM!");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6fc8b4efb30..8cb2fc8cdfe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2238,7 +2238,7 @@ public class StreamThreadTest {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
         final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor 
=
-            () -> new ContextualProcessor<Object, Object, Void, Void>() {
+            () -> new ContextualProcessor<>() {
                 @Override
                 public void init(final ProcessorContext<Void, Void> context) {
                     context.schedule(Duration.ofMillis(100L), 
PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
@@ -2506,7 +2506,7 @@ public class StreamThreadTest {
 
         if (stateUpdaterEnabled) {
             TestUtils.waitForCondition(
-                () -> mockRestoreConsumer.assignment().size() == 0,
+                () -> mockRestoreConsumer.assignment().isEmpty(),
                 "Never get the assignment");
         } else {
             TestUtils.waitForCondition(
@@ -3444,7 +3444,7 @@ public class StreamThreadTest {
 
     @ParameterizedTest
     @MethodSource("data")        
-    public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
Exception {
+    public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
 
@@ -3560,7 +3560,7 @@ public class StreamThreadTest {
 
     @ParameterizedTest
     @MethodSource("data")        
-    public void shouldTimeOutOnProducerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
+    public void shouldTimeOutOnProducerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final MockProducer<byte[], byte[]> producer = new MockProducer<>();
         producer.setClientInstanceId(Uuid.randomUuid());
         producer.injectTimeoutException(-1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 29fa204a579..b34d1408c56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.OffsetSpec;
-import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
@@ -2819,23 +2818,6 @@ public class StreamsPartitionAssignorTest {
         return changelogEndOffsets;
     }
 
-    private static Map<String, TopicDescription> getTopicDescriptionMap(final 
List<String> changelogTopics,
-                                                                        final 
List<List<TopicPartitionInfo>> topicPartitionInfos) {
-        if (changelogTopics.size() != topicPartitionInfos.size()) {
-            throw new IllegalStateException("Passed in " + 
changelogTopics.size() + " changelog topic names, but " +
-                topicPartitionInfos.size() + " different topicPartitionInfo 
for the topics");
-        }
-        final Map<String, TopicDescription> changeLogTopicDescriptions = new 
HashMap<>();
-        for (int i = 0; i < changelogTopics.size(); i++) {
-            final String topic = changelogTopics.get(i);
-            final List<TopicPartitionInfo> topicPartitionInfo = 
topicPartitionInfos.get(i);
-            changeLogTopicDescriptions.put(topic, new TopicDescription(topic, 
false, topicPartitionInfo));
-        }
-
-        return changeLogTopicDescriptions;
-    }
-
-
     private static SubscriptionInfo getInfoForOlderVersion(final int version,
                                                            final ProcessId 
processId,
                                                            final Set<TaskId> 
prevTasks,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 6c6362ee8bc..1c084fa63e2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -920,8 +920,8 @@ public class StreamsProducerTest {
             METADATA_WAIT_TIME;
         assertThat(eosStreamsProducer.totalBlockedTime(), 
equalTo(expectedTotalBlocked));
         final long closeStart = 1L;
-        final long clodeDelay = 1L;
-        
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + 
clodeDelay);
+        final long closeDelay = 1L;
+        
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + 
closeDelay);
         eosStreamsProducer.resetProducer(eosMockProducer);
         setProducerMetrics(
             eosMockProducer,
@@ -937,7 +937,7 @@ public class StreamsProducerTest {
 
         assertThat(
             eosStreamsProducer.totalBlockedTime(),
-            closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01)
+            closeTo(2 * expectedTotalBlocked + closeDelay, 0.01)
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index d8bb35c000a..68abf455a76 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1678,7 +1678,7 @@ public class TaskManagerTest {
         );
 
         assertEquals(exception, thrown);
-        assertEquals(statefulTask.id(), thrown.taskId().get());
+        assertEquals(statefulTask.id(), thrown.taskId().orElseThrow());
     }
 
     @Test
@@ -2149,7 +2149,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception 
{
+    public void shouldCloseActiveTasksWhenHandlingLostTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 7c7047b7516..206a6cc796d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -523,7 +523,7 @@ public final class AssignmentTestUtils {
     static <V> Matcher<ClientState> hasProperty(final String propertyName,
                                                 final Function<ClientState, V> 
propertyExtractor,
                                                 final V propertyValue) {
-        return new BaseMatcher<ClientState>() {
+        return new BaseMatcher<>() {
             @Override
             public void describeTo(final Description description) {
                 
description.appendText(propertyName).appendText(":").appendValue(propertyValue);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
index 381ae972aec..ccc904b5e81 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
@@ -826,7 +826,7 @@ public class RackAwareTaskAssignorTest {
 
     @ParameterizedTest
     @MethodSource("paramStoreType")
-    public void shouldThrowIfMissingCallcanEnableRackAwareAssignor(final 
boolean stateful, final String assignmentStrategy) {
+    public void shouldThrowIfMissingCallCanEnableRackAwareAssignor(final 
boolean stateful, final String assignmentStrategy) {
         setUp(stateful);
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterForAllTopics(),
@@ -1125,28 +1125,28 @@ public class RackAwareTaskAssignorTest {
         setUp(stateful);
         final int nodeSize = 50;
         final int tpSize = 60;
-        final int partionSize = 3;
+        final int partitionSize = 3;
         final int clientSize = 50;
         final int replicaCount = 3;
         final int maxCapacity = 3;
         final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = 
getTaskTopicPartitionMap(
-            tpSize, partionSize, false);
+            tpSize, partitionSize, false);
         final AssignmentConfigs assignorConfiguration = 
getRackAwareEnabledConfigWithStandby(replicaCount, assignmentStrategy);
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
-            getRandomCluster(nodeSize, tpSize, partionSize),
+            getRandomCluster(nodeSize, tpSize, partitionSize),
             taskTopicPartitionMap,
-            getTaskTopicPartitionMap(tpSize, partionSize, true),
+            getTaskTopicPartitionMap(tpSize, partitionSize, true),
             getTopologyGroupTaskMap(),
             getRandomProcessRacks(clientSize, nodeSize),
-            mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, 
partionSize),
+            mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, 
partitionSize),
             assignorConfiguration,
             time
         );
 
         final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) 
taskTopicPartitionMap.keySet();
         final SortedMap<ProcessId, ClientState> clientStateMap = 
getRandomClientState(clientSize,
-            tpSize, partionSize, maxCapacity, taskIds);
+            tpSize, partitionSize, maxCapacity, taskIds);
 
         final StandbyTaskAssignor standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(
             assignorConfiguration, assignor);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
index a42f4396041..2aba4da020a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
@@ -544,13 +544,11 @@ public class TaskAssignmentUtilsTest {
             new TopicPartition(String.format("test-topic-%d", 
taskId.subtopology()), taskId.partition()),
             true,
             true,
-            () -> {
-                partitions.forEach(partition -> {
-                    if (partition != null && rackIds != null) {
-                        partition.annotateWithRackIds(rackIds);
-                    }
-                });
-            }
+            () -> partitions.forEach(partition -> {
+                if (partition != null && rackIds != null) {
+                    partition.annotateWithRackIds(rackIds);
+                }
+            })
         ));
         return mkEntry(
             taskId,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 936c9eb6a85..39d687f3cdc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -425,7 +425,7 @@ public class TaskAssignorConvergenceTest {
         testForConvergence(harness, configs, numStatefulTasks / 
maxWarmupReplicas + 1);
         verifyValidAssignment(numStandbyReplicas, harness);
 
-        // min-cost rack aware assignor doesn't balance subtopolgy
+        // min-cost rack aware assignor doesn't balance subtopology
         if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
             verifyBalancedAssignment(harness, skewThreshold);
         }
@@ -463,7 +463,7 @@ public class TaskAssignorConvergenceTest {
 
         verifyValidAssignment(numStandbyReplicas, harness);
 
-        // min-cost rack aware assignor doesn't balance subtopolgy
+        // min-cost rack aware assignor doesn't balance subtopology
         if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
             verifyBalancedAssignment(harness, skewThreshold);
         }
@@ -520,7 +520,7 @@ public class TaskAssignorConvergenceTest {
             testForConvergence(harness, configs, 1);
             verifyValidAssignment(numStandbyReplicas, harness);
 
-            // min-cost rack aware assignor doesn't balance subtopolgy
+            // min-cost rack aware assignor doesn't balance subtopology
             if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
                 verifyBalancedAssignment(harness, skewThreshold);
             }
@@ -540,7 +540,7 @@ public class TaskAssignorConvergenceTest {
                 if (!harness.clientStates.isEmpty()) {
                     testForConvergence(harness, configs, 2 * (numStatefulTasks 
+ numStatefulTasks * numStandbyReplicas));
                     verifyValidAssignment(numStandbyReplicas, harness);
-                    // min-cost rack aware assignor doesn't balance subtopolgy
+                    // min-cost rack aware assignor doesn't balance subtopology
                     if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
                         verifyBalancedAssignment(harness, skewThreshold);
                     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index c6bf68e4df5..bdb9d028c54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -99,9 +99,7 @@ public class StreamsMetricsImplTest {
     private static final String TASK_ID1 = "test-task-1";
     private static final String TASK_ID2 = "test-task-2";
     private static final String NODE_ID1 = "test-node-1";
-    private static final String NODE_ID2 = "test-node-2";
     private static final String TOPIC_ID1 = "test-topic-1";
-    private static final String TOPIC_ID2 = "test-topic-2";
     private static final String METRIC_NAME1 = "test-metric1";
     private static final String METRIC_NAME2 = "test-metric2";
     private static final String THREAD_ID_TAG = "thread-id";
@@ -236,8 +234,7 @@ public class StreamsMetricsImplTest {
         return sensorKeys;
     }
 
-    private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics metrics,
-                                                  final RecordingLevel 
recordingLevel) {
+    private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics 
metrics) {
         final ArgumentCaptor<String> sensorKey = 
ArgumentCaptor.forClass(String.class);
         when(metrics.getSensor(sensorKey.capture())).thenReturn(null);
         final Sensor[] parents = {};
@@ -253,7 +250,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewThreadLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -277,7 +274,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewTaskLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
@@ -311,7 +308,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewTopicLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
@@ -349,7 +346,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewStoreLevelSensorIfNoneExists() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        final ArgumentCaptor<String> sensorKeys = 
setupGetNewSensorTest(metrics, recordingLevel);
+        final ArgumentCaptor<String> sensorKeys = 
setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
@@ -477,10 +474,11 @@ public class StreamsMetricsImplTest {
         final MetricName metricName =
                 new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
-        final Metrics metrics = new Metrics(metricConfig);
-        assertNull(metrics.metric(metricName));
-        metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
-        assertNotNull(metrics.metric(metricName));
+        try (Metrics metrics = new Metrics(metricConfig)) {
+            assertNull(metrics.metric(metricName));
+            metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER);
+            assertNotNull(metrics.metric(metricName));
+        }
     }
 
     @Test
@@ -509,10 +507,11 @@ public class StreamsMetricsImplTest {
         final MetricName metricName =
                 new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
-        final Metrics metrics = new Metrics(metricConfig);
-        assertNull(metrics.metric(metricName));
-        final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, 
metricConfig, VALUE_PROVIDER);
-        assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, 
metricConfig, VALUE_PROVIDER));
+        try (Metrics metrics = new Metrics(metricConfig)) {
+            assertNull(metrics.metric(metricName));
+            final KafkaMetric kafkaMetric = 
metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
+            assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, 
metricConfig, VALUE_PROVIDER));
+        }
     }
 
     @Test
@@ -520,20 +519,21 @@ public class StreamsMetricsImplTest {
         final MetricName metricName =
                 new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
-        final Metrics metrics = new Metrics(metricConfig);
-        assertNull(metrics.metric(metricName));
-        final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new 
AtomicReference<>();
-        final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new 
AtomicReference<>();
+        try (Metrics metrics = new Metrics(metricConfig)) {
+            assertNull(metrics.metric(metricName));
+            final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new 
AtomicReference<>();
+            final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new 
AtomicReference<>();
 
-        final Thread thread1 = new Thread(() -> 
metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
-        final Thread thread2 = new Thread(() -> 
metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
+            final Thread thread1 = new Thread(() -> 
metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
+            final Thread thread2 = new Thread(() -> 
metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
 
-        thread1.start();
-        thread2.start();
+            thread1.start();
+            thread2.start();
 
-        thread1.join();
-        thread2.join();
-        assertEquals(metricCreatedViaThread1.get(), 
metricCreatedViaThread2.get());
+            thread1.join();
+            thread2.join();
+            assertEquals(metricCreatedViaThread1.get(), 
metricCreatedViaThread2.get());
+        }
     }
 
     @Test
@@ -561,7 +561,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewNodeLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
@@ -598,7 +598,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
@@ -634,7 +634,7 @@ public class StreamsMetricsImplTest {
     public void shouldGetNewClientLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        setupGetNewSensorTest(metrics, recordingLevel);
+        setupGetNewSensorTest(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
index 7760d32e431..50964febf63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
@@ -529,7 +529,7 @@ public abstract class AbstractRocksDBWindowStoreTest 
extends AbstractWindowBytes
         }
 
         // the latest record has a timestamp > 60k. So, the +1 in actualFrom 
calculation in
-        // RocksDbWindowStore shouldn't have an implciation and all stores 
should return the same fetched counts.
+        // RocksDbWindowStore shouldn't have an implication and all stores 
should return the same fetched counts.
         assertEquals(1, fetchedCount);
         assertEquals(
                 Set.of(segments.segmentName(3L), segments.segmentName(5L)),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 0db31dfe039..159503b920a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -879,13 +879,13 @@ public class CachingPersistentSessionStoreTest {
 
     public static class CacheFlushListenerStub<K, V> implements 
CacheFlushListener<byte[], byte[]> {
         private final Deserializer<K> keyDeserializer;
-        private final Deserializer<V> valueDesializer;
+        private final Deserializer<V> valueDeserializer;
         private final List<KeyValueTimestamp<K, Change<V>>> forwarded = new 
LinkedList<>();
 
         CacheFlushListenerStub(final Deserializer<K> keyDeserializer,
-                               final Deserializer<V> valueDesializer) {
+                               final Deserializer<V> valueDeserializer) {
             this.keyDeserializer = keyDeserializer;
-            this.valueDesializer = valueDesializer;
+            this.valueDeserializer = valueDeserializer;
         }
 
         @Override
@@ -894,8 +894,8 @@ public class CachingPersistentSessionStoreTest {
                 new KeyValueTimestamp<>(
                     keyDeserializer.deserialize(null, record.key()),
                     new Change<>(
-                        valueDesializer.deserialize(null, 
record.value().newValue),
-                        valueDesializer.deserialize(null, 
record.value().oldValue)),
+                        valueDeserializer.deserialize(null, 
record.value().newValue),
+                        valueDeserializer.deserialize(null, 
record.value().oldValue)),
                     record.timestamp()
                 )
             );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
index 9c8fd2ce5f3..ae59fdbf214 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.GenericInMemoryKeyValueStore;
 
@@ -27,6 +26,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;
@@ -67,22 +67,12 @@ public class FilteredCacheIteratorTest {
     @BeforeEach
     public void before() {
         store.putAll(entries);
-        final HasNextCondition allCondition = new HasNextCondition() {
-            @Override
-            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                return iterator.hasNext();
-            }
-        };
+        final HasNextCondition allCondition = Iterator::hasNext;
         allIterator = new FilteredCacheIterator(
             new DelegatingPeekingKeyValueIterator<>("",
                                                     store.all()), 
allCondition, IDENTITY_FUNCTION);
 
-        final HasNextCondition firstEntryCondition = new HasNextCondition() {
-            @Override
-            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                return iterator.hasNext() && 
iterator.peekNextKey().equals(firstEntry.key);
-            }
-        };
+        final HasNextCondition firstEntryCondition = iterator -> 
iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
         firstEntryIterator = new FilteredCacheIterator(
                 new DelegatingPeekingKeyValueIterator<>("",
                                                         store.all()), 
firstEntryCondition, IDENTITY_FUNCTION);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java
index ec1814ebd9c..2a57c36659d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java
@@ -42,10 +42,4 @@ public class MonotonicProcessorRecordContext extends 
ProcessorRecordContext {
         }
         return ret;
     }
-
-    public void kick() {
-        if (!automatic) {
-            counter++;
-        }
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java
index b583148ab47..670a980d9ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java
@@ -44,8 +44,8 @@ public class Murmur3Test {
         cases.put(new byte[]{'a', 'b', 'c'}, 461137560);
 
         int seed = 123;
-        for (Map.Entry c : cases.entrySet()) {
-            byte[] b = (byte[]) c.getKey();
+        for (Map.Entry<byte[], Integer> c : cases.entrySet()) {
+            byte[] b = c.getKey();
             assertEquals(c.getValue(), Murmur3.hash32(b, b.length, seed));
         }
     }
@@ -62,10 +62,10 @@ public class Murmur3Test {
 
         int seed = 123;
 
-        for (Map.Entry c : cases.entrySet()) {
-            byte[] b = (byte[]) c.getKey();
+        for (Map.Entry<byte[], long[]> c : cases.entrySet()) {
+            byte[] b = c.getKey();
             long[] result = Murmur3.hash128(b, 0, b.length, seed);
-            assertArrayEquals((long[]) c.getValue(), result);
+            assertArrayEquals(c.getValue(), result);
         }
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index c574950ac90..25d7fa3a68b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -110,7 +110,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
@@ -149,7 +149,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
@@ -191,7 +191,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
@@ -235,7 +235,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
@@ -286,7 +286,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
@@ -342,7 +342,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         }
         final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
 
-        return new KeyValueIterator<Windowed<K>, V>() {
+        return new KeyValueIterator<>() {
             @Override
             public void close() {
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 08248b02054..3ce8cb63efc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -81,7 +81,7 @@ import static org.mockito.Mockito.mockingDetails;
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
 
-    private final List<String> walRelatedMethods = new LinkedList<String>() {
+    private final List<String> walRelatedMethods = new LinkedList<>() {
         {
             add("setManualWalFlush");
             add("setMaxTotalWalSize");
@@ -94,7 +94,7 @@ public class 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
         }
     };
 
-    private final List<String> ignoreMethods = new LinkedList<String>() {
+    private final List<String> ignoreMethods = new LinkedList<>() {
         {
             add("isOwningHandle");
             add("getNativeHandle");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
index bdcdd2d6d8c..08391465a5f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -88,8 +87,6 @@ public class WindowStoreFetchTest {
     private String innerLowBetween;
     private String innerHighBetween;
 
-    private TimeWindowedKStream<String, String> windowedStream;
-
     public void setup(final StoreType storeType, 
                       final boolean enableLogging, 
                       final boolean enableCaching, 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index dce7d417c09..ab868b672d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -65,7 +65,7 @@ public class WrappingStoreProviderTest {
     @Test
     public void shouldFindKeyValueStores() {
         final List<ReadOnlyKeyValueStore<String, String>> results =
-                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.<String, String>keyValueStore());
+                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.keyValueStore());
         assertEquals(2, results.size());
     }
 
@@ -95,7 +95,7 @@ public class WrappingStoreProviderTest {
     public void shouldReturnAllStoreWhenQueryWithoutPartition() {
         
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv",
 QueryableStoreTypes.<String, String>keyValueStore()));
         final List<ReadOnlyKeyValueStore<String, String>> results =
-                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.<String, String>keyValueStore());
+                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.keyValueStore());
         assertEquals(numStateStorePartitions, results.size());
     }
 
@@ -103,7 +103,7 @@ public class WrappingStoreProviderTest {
     public void shouldReturnSingleStoreWhenQueryWithPartition() {
         
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv",
 QueryableStoreTypes.<String, 
String>keyValueStore()).withPartition(numStateStorePartitions - 1));
         final List<ReadOnlyKeyValueStore<String, String>> results =
-                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.<String, String>keyValueStore());
+                wrappingStoreProvider.stores("kv", 
QueryableStoreTypes.keyValueStore());
         assertEquals(1, results.size());
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
index dc18c36fff6..4bca7f995a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
@@ -311,7 +311,7 @@ public class RelationalSmokeTest extends SmokeTestUtil {
             final long dataStartTime = System.currentTimeMillis() - timeSpan;
             final long dataEndTime = System.currentTimeMillis();
 
-            // Explicitly create a seed so we can we can log.
+            // Explicitly create a seed so we can log.
             // If we are debugging a failed run, we can deterministically 
produce the same dataset
             // by plugging in the seed from that run.
             final long seed = new Random().nextLong();
@@ -368,7 +368,7 @@ public class RelationalSmokeTest extends SmokeTestUtil {
          * data distribution: Zipfian and Normal, while also being efficient 
to generate.
          */
         private static Iterator<Integer> zipfNormal(final Random random, final 
int keySpace) {
-            return new Iterator<Integer>() {
+            return new Iterator<>() {
                 @Override
                 public boolean hasNext() {
                     return true;
@@ -829,13 +829,13 @@ public class RelationalSmokeTest extends SmokeTestUtil {
                 pass,
                 report,
                 "Expected 1 article, got " + consumedArticles.size(),
-                consumedArticles.size() > 0
+                !consumedArticles.isEmpty()
             );
             assertThat(
                 pass,
                 report,
                 "Expected 1 comment, got " + consumedComments.size(),
-                consumedComments.size() > 0
+                !consumedComments.isEmpty()
             );
 
             assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
index 68ad90babb8..5f9d932e063 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 
 import java.time.Duration;
@@ -49,11 +48,8 @@ public class ShutdownDeadlockTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> source = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
 
-        source.foreach(new ForeachAction<String, String>() {
-            @Override
-            public void apply(final String key, final String value) {
-                throw new RuntimeException("KABOOM!");
-            }
+        source.foreach((key, value) -> {
+            throw new RuntimeException("KABOOM!");
         });
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.setUncaughtExceptionHandler(e -> {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 5bc679245d7..9ca9db21a95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -174,7 +174,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
             .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), 
Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
-            .reduce((l, r) -> l + r);
+            .reduce(Integer::sum);
 
         streamify(smallWindowSum, "sws-raw");
         
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), 
"sws-suppressed");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 835dae61ed1..7e670802b93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -39,7 +39,7 @@ public class SmokeTestUtil {
     }
 
     static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic, final String name) {
-        return () -> new ContextualProcessor<Object, Object, Void, Void>() {
+        return () -> new ContextualProcessor<>() {
             private int numRecordsProcessed = 0;
             private long smallestOffset = Long.MAX_VALUE;
             private long largestOffset = Long.MIN_VALUE;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index c966b2de1fe..77224e23947 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -93,7 +93,7 @@ public class StreamsBrokerDownResilienceTest {
         final Serde<String> stringSerde = Serdes.String();
 
         builder.stream(Collections.singletonList(SOURCE_TOPIC_1), 
Consumed.with(stringSerde, stringSerde))
-            .peek(new ForeachAction<String, String>() {
+            .peek(new ForeachAction<>() {
                 int messagesProcessed = 0;
                 @Override
                 public void apply(final String key, final String value) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index f7be9430d66..26da5bfeb1c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -55,7 +55,7 @@ public class StreamsNamedRepartitionTest {
 
         final String inputTopic = (String) 
(Objects.requireNonNull(streamsProperties.remove("input.topic")));
         final String aggregationTopic = (String) 
(Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
-        final boolean addOperators = 
Boolean.valueOf(Objects.requireNonNull((String) 
streamsProperties.remove("add.operations")));
+        final boolean addOperators = 
Boolean.parseBoolean(Objects.requireNonNull((String) 
streamsProperties.remove("add.operations")));
 
 
         final Initializer<Integer> initializer = () -> 0;
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java 
b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 8b049a01ee4..5f19662c0f0 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -50,7 +50,7 @@ public class MockInternalTopicManager extends 
InternalTopicManager {
     public Set<String> makeReady(final Map<String, InternalTopicConfig> 
topics) {
         for (final InternalTopicConfig topic : topics.values()) {
             final String topicName = topic.name();
-            final int numberOfPartitions = topic.numberOfPartitions().get();
+            final int numberOfPartitions = 
topic.numberOfPartitions().orElseThrow();
             readyTopics.put(topicName, numberOfPartitions);
 
             final List<PartitionInfo> partitions = new ArrayList<>();
diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java 
b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
index e5e0763bd2a..6c63b0adfdd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
@@ -23,11 +23,6 @@ public class MockValueJoiner {
     public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = 
instance("+");
 
     public static <V1, V2> ValueJoiner<V1, V2, String> instance(final String 
separator) {
-        return new ValueJoiner<V1, V2, String>() {
-            @Override
-            public String apply(final V1 value1, final V2 value2) {
-                return value1 + separator + value2;
-            }
-        };
+        return (value1, value2) -> value1 + separator + value2;
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java
 
b/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java
index b948abffe27..9f773e7ba64 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java
@@ -25,7 +25,7 @@ public class NoOpValueTransformerWithKeySupplier<K, V> 
implements ValueTransform
 
     @Override
     public ValueTransformerWithKey<K, V, V> get() {
-        return new ValueTransformerWithKey<K, V, V>() {
+        return new ValueTransformerWithKey<>() {
 
             @Override
             public void init(final ProcessorContext context1) {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java 
b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index 928566fa779..d7c5936b7d5 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -103,7 +103,7 @@ public class ReadOnlySessionStoreStub<K, V> implements 
ReadOnlySessionStore<K, V
         }
         final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = 
subSessionsMap.values().iterator();
         return new KeyValueIteratorStub<>(
-            new Iterator<KeyValue<Windowed<K>, V>>() {
+            new Iterator<>() {
 
                 Iterator<KeyValue<Windowed<K>, V>> it;
 
@@ -155,7 +155,7 @@ public class ReadOnlySessionStoreStub<K, V> implements 
ReadOnlySessionStore<K, V
 
         final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = 
subSessionsMap.descendingMap().values().iterator();
         return new KeyValueIteratorStub<>(
-            new Iterator<KeyValue<Windowed<K>, V>>() {
+            new Iterator<>() {
 
                 Iterator<KeyValue<Windowed<K>, V>> it;
 

Reply via email to