This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 988fa3f272d MINOR: Small cleanups in streams tests (#19446) 988fa3f272d is described below commit 988fa3f272da6ab6c93ee01efbc865f74983a76a Author: Dmitry Werner <grimekil...@gmail.com> AuthorDate: Wed Apr 30 04:26:50 2025 +0500 MINOR: Small cleanups in streams tests (#19446) - Fixed typos - Fixed IDEA code inspection warnings - Removed unused fields and methods Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi <frankvi...@apache.org>, Matthias J. Sax <matth...@confluent.io> --- .../apache/kafka/streams/AutoOffsetResetTest.java | 2 +- .../org/apache/kafka/streams/KafkaStreamsTest.java | 25 +++++---- .../apache/kafka/streams/StreamsBuilderTest.java | 6 +-- .../apache/kafka/streams/StreamsConfigTest.java | 14 ++--- .../kafka/streams/processor/ReadOnlyStoreTest.java | 2 +- .../internals/CopartitionedTopicsEnforcerTest.java | 6 +-- .../internals/DefaultStateUpdaterTest.java | 28 ++-------- .../internals/GlobalStateManagerImplTest.java | 60 +++++++++++----------- .../internals/GlobalStreamThreadTest.java | 4 +- .../internals/InternalTopicManagerTest.java | 22 ++++---- .../internals/InternalTopologyBuilderTest.java | 6 +-- .../processor/internals/NamedTopologyTest.java | 6 +-- .../processor/internals/ProcessorMetadataTest.java | 4 +- .../processor/internals/ProcessorNodeTest.java | 4 +- .../processor/internals/ReadOnlyTaskTest.java | 4 +- .../internals/RepartitionOptimizingTest.java | 4 -- .../RepartitionWithMergeOptimizingTest.java | 4 -- .../processor/internals/StandbyTaskTest.java | 2 +- .../processor/internals/StateDirectoryTest.java | 3 -- .../processor/internals/StateManagerUtilTest.java | 2 +- .../internals/StoreChangelogReaderTest.java | 10 ++-- .../processor/internals/StreamTaskTest.java | 6 +-- .../processor/internals/StreamThreadTest.java | 8 +-- .../internals/StreamsPartitionAssignorTest.java | 18 ------- .../processor/internals/StreamsProducerTest.java | 6 +-- .../processor/internals/TaskManagerTest.java | 4 +- .../internals/assignment/AssignmentTestUtils.java | 2 +- .../assignment/RackAwareTaskAssignorTest.java | 14 ++--- .../assignment/TaskAssignmentUtilsTest.java | 12 ++--- .../assignment/TaskAssignorConvergenceTest.java | 8 +-- .../internals/metrics/StreamsMetricsImplTest.java | 60 +++++++++++----------- .../internals/AbstractRocksDBWindowStoreTest.java | 2 +- .../CachingPersistentSessionStoreTest.java | 10 ++-- .../state/internals/FilteredCacheIteratorTest.java | 16 ++---- .../internals/MonotonicProcessorRecordContext.java | 6 --- .../kafka/streams/state/internals/Murmur3Test.java | 10 ++-- .../state/internals/ReadOnlyWindowStoreStub.java | 12 ++--- ...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 4 +- .../state/internals/WindowStoreFetchTest.java | 3 -- .../state/internals/WrappingStoreProviderTest.java | 6 +-- .../kafka/streams/tests/RelationalSmokeTest.java | 8 +-- .../kafka/streams/tests/ShutdownDeadlockTest.java | 8 +-- .../kafka/streams/tests/SmokeTestClient.java | 2 +- .../apache/kafka/streams/tests/SmokeTestUtil.java | 2 +- .../tests/StreamsBrokerDownResilienceTest.java | 2 +- .../streams/tests/StreamsNamedRepartitionTest.java | 2 +- .../kafka/test/MockInternalTopicManager.java | 2 +- .../org/apache/kafka/test/MockValueJoiner.java | 7 +-- .../test/NoOpValueTransformerWithKeySupplier.java | 2 +- .../kafka/test/ReadOnlySessionStoreStub.java | 4 +- 50 files changed, 190 insertions(+), 274 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java index fb4d9738c93..0ef63c0f857 100644 --- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java @@ -41,7 +41,7 @@ class AutoOffsetResetTest { } @Test - void shouldThrowExceptionOnDurationForLastetReset() { + void shouldThrowExceptionOnDurationForLatestReset() { final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest()); assertThrows(IllegalStateException.class, latest::duration, "Latest should not have a duration."); } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 1516dfb5eda..499498224bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.UnknownStateStoreException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -345,7 +344,7 @@ public class KafkaStreamsTest { }).when(thread).start(); } - private CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1); + private final CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1); private void prepareTerminableThread(final StreamThread thread) throws InterruptedException { doAnswer(invocation -> { @@ -561,7 +560,7 @@ public class KafkaStreamsTest { try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { assertEquals(NUM_THREADS, streams.threads.size()); - assertEquals(streams.state(), KafkaStreams.State.CREATED); + assertEquals(KafkaStreams.State.CREATED, streams.state()); streams.start(); waitForCondition( @@ -620,7 +619,7 @@ public class KafkaStreamsTest { ); streams.close(); - assertEquals(streams.state(), KafkaStreams.State.ERROR, "KafkaStreams should remain in ERROR state after close."); + assertEquals(KafkaStreams.State.ERROR, streams.state(), "KafkaStreams should remain in ERROR state after close."); assertThat(appender.getMessages(), hasItem(containsString("State transition from RUNNING to PENDING_ERROR"))); assertThat(appender.getMessages(), hasItem(containsString("State transition from PENDING_ERROR to ERROR"))); assertThat(appender.getMessages(), hasItem(containsString("Streams client is already in the terminal ERROR state"))); @@ -644,7 +643,7 @@ public class KafkaStreamsTest { streams.start(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); streams.close(); - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get()); } } @@ -875,7 +874,7 @@ public class KafkaStreamsTest { prepareThreadState(streamThreadTwo, state2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -886,7 +885,7 @@ public class KafkaStreamsTest { prepareStreamThread(streamThreadTwo, 2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -896,7 +895,7 @@ public class KafkaStreamsTest { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -1369,7 +1368,7 @@ public class KafkaStreamsTest { } @Test - public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exception { + public void shouldGetClientSupplierFromConfigForConstructorWithTime() { prepareStreams(); final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1); final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2); @@ -1548,7 +1547,7 @@ public class KafkaStreamsTest { try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { assertThat(streams.threads.size(), equalTo(0)); - assertEquals(streams.state(), KafkaStreams.State.CREATED); + assertEquals(KafkaStreams.State.CREATED, streams.state()); streams.start(); waitForCondition( @@ -1796,7 +1795,7 @@ public class KafkaStreamsTest { final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false); when(streamThreadOne.clientInstanceIds(any())) - .thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<Uuid>() { + .thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<>() { @Override public Uuid get(final long timeout, final TimeUnit timeUnit) { didAssertThreadOne.set(true); @@ -1806,7 +1805,7 @@ public class KafkaStreamsTest { } })); when(streamThreadTwo.clientInstanceIds(any())) - .thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<Uuid>() { + .thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<>() { @Override public Uuid get(final long timeout, final TimeUnit timeUnit) { didAssertThreadTwo.set(true); @@ -1823,7 +1822,7 @@ public class KafkaStreamsTest { streams.start(); when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any())) - .thenReturn(new KafkaFutureImpl<Uuid>() { + .thenReturn(new KafkaFutureImpl<>() { @Override public Uuid get(final long timeout, final TimeUnit timeUnit) { didAssertGlobalThread.set(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index a6c79806319..b839df53167 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -145,7 +145,7 @@ public class StreamsBuilderTest { ), "topic", Consumed.with(Serdes.String(), Serdes.String()), - () -> new Processor<String, String, Void, Void>() { + () -> new Processor<>() { private KeyValueStore<String, String> store; @Override @@ -454,7 +454,7 @@ public class StreamsBuilderTest { builder.stream(topic) .groupByKey() - .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store")) + .count(Materialized.as("store")) .toStream(); builder.build(); @@ -474,7 +474,7 @@ public class StreamsBuilderTest { builder.stream(topic) .groupByKey() - .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store")) + .count(Materialized.as("store")) .toStream(); builder.build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a7f657ec54b..f081a768815 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -855,7 +855,7 @@ public class StreamsConfigTest { try { new StreamsConfig(props).getProducerConfigs(clientId); - fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer"); + fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be parsed into an integer"); } catch (final ConfigException e) { assertEquals( "Invalid value not-a-number for configuration max.in.flight.requests.per.connection:" + @@ -875,8 +875,8 @@ public class StreamsConfigTest { @Test public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() { final String expectedOptimizeConfig = "none"; - final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG); - assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"none\""); + final String actualOptimizedConfig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG); + assertEquals(expectedOptimizeConfig, actualOptimizedConfig, "Optimization should be \"none\""); } @Test @@ -884,8 +884,8 @@ public class StreamsConfigTest { final String expectedOptimizeConfig = "all"; props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all"); final StreamsConfig config = new StreamsConfig(props); - final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG); - assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"all\""); + final String actualOptimizedConfig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG); + assertEquals(expectedOptimizeConfig, actualOptimizedConfig, "Optimization should be \"all\""); } @Test @@ -1216,13 +1216,13 @@ public class StreamsConfigTest { } @Test - public void shouldtSetMinTrafficRackAwareAssignmentConfig() { + public void shouldSetMinTrafficRackAwareAssignmentConfig() { props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); assertEquals("min_traffic", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); } @Test - public void shouldtSetBalanceSubtopologyRackAwareAssignmentConfig() { + public void shouldSetBalanceSubtopologyRackAwareAssignmentConfig() { props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY); assertEquals("balance_subtopology", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java index 469dfe04f2b..3b639339902 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java @@ -57,7 +57,7 @@ public class ReadOnlyStoreTest { new StringDeserializer(), "storeTopic", "readOnlyProcessor", - () -> new Processor<Integer, String, Void, Void>() { + () -> new Processor<>() { KeyValueStore<Integer, String> store; @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java index 42fcc363398..82ff3cea85d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsEnforcerTest.java @@ -124,8 +124,8 @@ public class CopartitionedTopicsEnforcerTest { ); final TreeMap<String, Integer> sorted = new TreeMap<>( - Utils.mkMap(Utils.mkEntry(topic1.name(), topic1.numberOfPartitions().get()), - Utils.mkEntry(topic2.name(), topic2.numberOfPartitions().get())) + Utils.mkMap(Utils.mkEntry(topic1.name(), topic1.numberOfPartitions().orElseThrow()), + Utils.mkEntry(topic2.name(), topic2.numberOfPartitions().orElseThrow())) ); assertEquals(String.format("Invalid topology: thread " + @@ -161,7 +161,7 @@ public class CopartitionedTopicsEnforcerTest { assertEquals(String.format("Invalid topology: thread Number of partitions [%s] " + "of repartition topic [%s] " + "doesn't match number of partitions [%s] of the source topic.", - topic1.numberOfPartitions().get(), topic1.name(), 2), ex.getMessage()); + topic1.numberOfPartitions().orElseThrow(), topic1.name(), 2), ex.getMessage()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index adc71ebc116..abb128698a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -203,7 +203,7 @@ class DefaultStateUpdaterTest { verifyRestoredActiveTasks(restoredTask); stateUpdater.shutdown(Duration.ofMinutes(1)); - final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start()); + final IllegalStateException exception = assertThrows(IllegalStateException.class, stateUpdater::start); assertEquals("State updater started with non-empty output queues." + " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the" @@ -220,7 +220,7 @@ class DefaultStateUpdaterTest { verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask)); stateUpdater.shutdown(Duration.ofMinutes(1)); - final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start()); + final IllegalStateException exception = assertThrows(IllegalStateException.class, stateUpdater::start); assertEquals("State updater started with non-empty output queues." + " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the" @@ -1785,7 +1785,7 @@ class DefaultStateUpdaterTest { private void verifyIdle() throws Exception { waitForCondition( - () -> stateUpdater.isIdle(), + stateUpdater::isIdle, VERIFICATION_TIMEOUT, "State updater did not enter an idling state!" ); @@ -1864,26 +1864,4 @@ class DefaultStateUpdaterTest { assertFalse(stateUpdater.hasExceptionsAndFailedTasks()); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); } - - private void verifyRemovedTasks(final Task... tasks) throws Exception { - if (tasks.length == 0) { - waitForCondition( - () -> stateUpdater.removedTasks().isEmpty(), - VERIFICATION_TIMEOUT, - "Did not get empty removed task within the given timeout!" - ); - } else { - final Set<Task> expectedRemovedTasks = Set.of(tasks); - final Set<Task> removedTasks = new HashSet<>(); - waitForCondition( - () -> { - removedTasks.addAll(stateUpdater.removedTasks()); - return removedTasks.containsAll(expectedRemovedTasks) - && removedTasks.size() == expectedRemovedTasks.size(); - }, - VERIFICATION_TIMEOUT, - "Did not get all removed task within the given timeout!" - ); - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 758f8cd500f..74705840de9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; @@ -110,7 +109,6 @@ public class GlobalStateManagerImplTest { private ProcessorTopology topology; private InternalMockProcessorContext processorContext; private Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> optionalMockReprocessFactory; - private DeserializationExceptionHandler deserializationExceptionHandler; static ProcessorTopology withGlobalStores(final List<StateStore> stateStores, final Map<String, String> storeToChangelogTopic, @@ -307,7 +305,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); stateManager.registerStore( - new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(store1) { + new WrappedStateStore<>(store1) { }, stateRestoreCallback, null); @@ -335,7 +333,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); stateManager.registerStore( - new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(store2) { + new WrappedStateStore<>(store2) { }, stateRestoreCallback, null); @@ -424,7 +422,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); - stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) { + stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) { @Override public void flush() { throw new RuntimeException("KABOOM!"); @@ -434,7 +432,7 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldCloseStateStores() throws IOException { + public void shouldCloseStateStores() { stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); @@ -451,7 +449,7 @@ public class GlobalStateManagerImplTest { public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() { stateManager.initialize(); initializeConsumer(1, 0, t1); - stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) { + stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) { @Override public void close() { throw new RuntimeException("KABOOM!"); @@ -476,7 +474,7 @@ public class GlobalStateManagerImplTest { public void shouldNotCloseStoresIfCloseAlreadyCalled() { stateManager.initialize(); initializeConsumer(1, 0, t1); - stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>("t1-store") { + stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") { @Override public void close() { if (!isOpen()) { @@ -494,7 +492,7 @@ public class GlobalStateManagerImplTest { public void shouldAttemptToCloseAllStoresEvenWhenSomeException() { stateManager.initialize(); initializeConsumer(1, 0, t1); - final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<Object, Object>("t1-store") { + final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<>("t1-store") { @Override public void close() { super.close(); @@ -598,7 +596,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { numberOfCalls.incrementAndGet(); @@ -634,13 +632,13 @@ public class GlobalStateManagerImplTest { assertThat(cause, instanceOf(TimeoutException.class)); assertThat(cause.getMessage(), equalTo("KABOOM!")); - assertEquals(numberOfCalls.get(), 1); + assertEquals(1, numberOfCalls.get()); } @Test public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { time.sleep(100L); @@ -675,13 +673,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 2); + assertEquals(2, numberOfCalls.get()); } @Test public void shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { time.sleep(100L); @@ -716,13 +714,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 11); + assertEquals(11, numberOfCalls.get()); } @Test public void shouldNotFailOnSlowProgressWhenEndOffsetsThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { time.sleep(1L); @@ -764,7 +762,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List<PartitionInfo> partitionsFor(final String topic) { numberOfCalls.incrementAndGet(); @@ -800,13 +798,13 @@ public class GlobalStateManagerImplTest { assertThat(cause, instanceOf(TimeoutException.class)); assertThat(cause.getMessage(), equalTo("KABOOM!")); - assertEquals(numberOfCalls.get(), 1); + assertEquals(1, numberOfCalls.get()); } @Test public void shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List<PartitionInfo> partitionsFor(final String topic) { time.sleep(100L); @@ -841,13 +839,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 2); + assertEquals(2, numberOfCalls.get()); } @Test public void shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List<PartitionInfo> partitionsFor(final String topic) { time.sleep(100L); @@ -882,13 +880,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 11); + assertEquals(11, numberOfCalls.get()); } @Test public void shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List<PartitionInfo> partitionsFor(final String topic) { time.sleep(1L); @@ -930,7 +928,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { numberOfCalls.incrementAndGet(); @@ -966,13 +964,13 @@ public class GlobalStateManagerImplTest { assertThat(cause, instanceOf(TimeoutException.class)); assertThat(cause.getMessage(), equalTo("KABOOM!")); - assertEquals(numberOfCalls.get(), 1); + assertEquals(1, numberOfCalls.get()); } @Test public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(100L); @@ -1007,13 +1005,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 2); + assertEquals(2, numberOfCalls.get()); } @Test public void shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(100L); @@ -1048,13 +1046,13 @@ public class GlobalStateManagerImplTest { ); assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed.")); - assertEquals(numberOfCalls.get(), 11); + assertEquals(11, numberOfCalls.get()); } @Test public void shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(1L); @@ -1090,7 +1088,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() { - consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) { time.sleep(timeout.toMillis()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index e4f78c900d1..419d6b01013 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -103,7 +103,7 @@ public class GlobalStreamThreadTest { ); final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () -> - new ContextualProcessor<Object, Object, Void, Void>() { + new ContextualProcessor<>() { @Override public void process(final Record<Object, Object> record) { } @@ -163,7 +163,7 @@ public class GlobalStreamThreadTest { @Test public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception { - final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new RuntimeException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 14470db2efa..c6d4cf4d239 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -96,7 +96,7 @@ import static org.mockito.Mockito.when; public class InternalTopicManagerTest { private final Node broker1 = new Node(0, "dummyHost-1", 1234); private final Node broker2 = new Node(1, "dummyHost-2", 1234); - private final List<Node> cluster = new ArrayList<Node>(2) { + private final List<Node> cluster = new ArrayList<>(2) { { add(broker1); add(broker2); @@ -115,7 +115,7 @@ public class InternalTopicManagerTest { private InternalTopicManager internalTopicManager; private final MockTime time = new MockTime(0); - private final Map<String, Object> config = new HashMap<String, Object>() { + private final Map<String, Object> config = new HashMap<>() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port()); @@ -361,8 +361,8 @@ public class InternalTopicManagerTest { final InternalTopicManager internalTopicManager = new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config)); try { - final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1)); - final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2)); + final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1)); + final Set<String> topic2set = new HashSet<>(Collections.singletonList(topic2)); internalTopicManager.getNumPartitions(topic1set, topic2set); @@ -373,8 +373,8 @@ public class InternalTopicManagerTest { mockAdminClient.timeoutNextRequest(1); try { - final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1)); - final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2)); + final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1)); + final Set<String> topic2set = new HashSet<>(Collections.singletonList(topic2)); internalTopicManager.getNumPartitions(topic1set, topic2set); @@ -710,22 +710,22 @@ public class InternalTopicManagerTest { internalTopicManager.makeReady(Collections.singletonMap(topic4, topicConfig4)); assertEquals(Set.of(topic1, topic2, topic3, topic4), mockAdminClient.listTopics().names().get()); - assertEquals(new TopicDescription(topic1, false, new ArrayList<TopicPartitionInfo>() { + assertEquals(new TopicDescription(topic1, false, new ArrayList<>() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get()); - assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() { + assertEquals(new TopicDescription(topic2, false, new ArrayList<>() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get()); - assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() { + assertEquals(new TopicDescription(topic3, false, new ArrayList<>() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get()); - assertEquals(new TopicDescription(topic4, false, new ArrayList<TopicPartitionInfo>() { + assertEquals(new TopicDescription(topic4, false, new ArrayList<>() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } @@ -804,7 +804,7 @@ public class InternalTopicManagerTest { mockAdminClient.addTopic( false, topic1, - new ArrayList<TopicPartitionInfo>() { + new ArrayList<>() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 366aa6636d0..86264b59bf7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -116,7 +116,7 @@ public class InternalTopologyBuilderTest { assertThat(builder.offsetResetStrategy(earliestTopic), equalTo(AutoOffsetResetStrategy.EARLIEST)); assertThat(builder.offsetResetStrategy(latestTopic), equalTo(AutoOffsetResetStrategy.LATEST)); assertThat(builder.offsetResetStrategy(durationTopic).type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION)); - assertThat(builder.offsetResetStrategy(durationTopic).duration().get().toSeconds(), equalTo(42L)); + assertThat(builder.offsetResetStrategy(durationTopic).duration().orElseThrow().toSeconds(), equalTo(42L)); } @Test @@ -127,7 +127,7 @@ public class InternalTopologyBuilderTest { final String durationTopicPattern = "duration.*Topic"; builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), "source0", null, null, null, Pattern.compile(noneTopicPattern)); - builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.earliest()), "sourc1", null, null, null, Pattern.compile(earliestTopicPattern)); + builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source1", null, null, null, Pattern.compile(earliestTopicPattern)); builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, Pattern.compile(latestTopicPattern)); builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), "source3", null, null, null, Pattern.compile(durationTopicPattern)); @@ -137,7 +137,7 @@ public class InternalTopologyBuilderTest { assertThat(builder.offsetResetStrategy("earliestTestTopic"), equalTo(AutoOffsetResetStrategy.EARLIEST)); assertThat(builder.offsetResetStrategy("latestTestTopic"), equalTo(AutoOffsetResetStrategy.LATEST)); assertThat(builder.offsetResetStrategy("durationTestTopic").type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION)); - assertThat(builder.offsetResetStrategy("durationTestTopic").duration().get().toSeconds(), equalTo(42L)); + assertThat(builder.offsetResetStrategy("durationTestTopic").duration().orElseThrow().toSeconds(), equalTo(42L)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 4c1110bac03..43a59b7057c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -107,9 +107,9 @@ public class NamedTopologyTest { final NamedTopology topology2 = builder2.build(); final NamedTopology topology3 = builder3.build(); streams.start(asList(topology1, topology2, topology3)); - assertThat(streams.getTopologyByName("topology-1").get(), equalTo(topology1)); - assertThat(streams.getTopologyByName("topology-2").get(), equalTo(topology2)); - assertThat(streams.getTopologyByName("topology-3").get(), equalTo(topology3)); + assertThat(streams.getTopologyByName("topology-1").orElseThrow(), equalTo(topology1)); + assertThat(streams.getTopologyByName("topology-2").orElseThrow(), equalTo(topology2)); + assertThat(streams.getTopologyByName("topology-3").orElseThrow(), equalTo(topology3)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java index b3810092e1e..edd276e850b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java @@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ProcessorMetadataTest { @Test - public void shouldAddandGetKeyValueWithEmptyConstructor() { + public void shouldAddAndGetKeyValueWithEmptyConstructor() { final ProcessorMetadata metadata = new ProcessorMetadata(); final String key = "some_key"; final long value = 100L; @@ -46,7 +46,7 @@ public class ProcessorMetadataTest { } @Test - public void shouldAddandGetKeyValueWithExistingMeta() { + public void shouldAddAndGetKeyValueWithExistingMeta() { final Map<String, Long> map = new HashMap<>(); map.put("key1", 1L); map.put("key2", 2L); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 5b4303a1695..86f617e7f34 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -106,7 +106,7 @@ public class ProcessorNodeTest { final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); - assertTrue(failedProcessingException.getCause() instanceof RuntimeException); + assertInstanceOf(RuntimeException.class, failedProcessingException.getCause()); assertEquals("Processing exception should be caught and handled by the processing exception handler.", failedProcessingException.getCause().getMessage()); assertEquals(NAME, failedProcessingException.failedProcessorNodeName()); @@ -310,7 +310,7 @@ public class ProcessorNodeTest { StreamsException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)) ); - assertTrue(se.getCause() instanceof ClassCastException); + assertInstanceOf(ClassCastException.class, se.getCause()); assertTrue(se.getMessage().contains("default Serdes")); assertTrue(se.getMessage().contains("input types")); assertTrue(se.getMessage().contains("pname")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java index 896e84baf5b..c95cf8c5a56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java @@ -41,7 +41,7 @@ import static org.mockito.Mockito.verify; class ReadOnlyTaskTest { - private final List<String> readOnlyMethods = new LinkedList<String>() { + private final List<String> readOnlyMethods = new LinkedList<>() { { add("needsInitializationOrRestoration"); add("inputPartitions"); @@ -56,7 +56,7 @@ class ReadOnlyTaskTest { } }; - private final List<String> objectMethods = new LinkedList<String>() { + private final List<String> objectMethods = new LinkedList<>() { { add("wait"); add("equals"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 91b95b87a7f..afb89fa3c8f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -50,8 +50,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -72,8 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @SuppressWarnings("deprecation") public class RepartitionOptimizingTest { - private static final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class); - private static final String INPUT_TOPIC = "input"; private static final String COUNT_TOPIC = "outputTopic_0"; private static final String AGGREGATION_TOPIC = "outputTopic_1"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index a9287fc3e59..e62986f4098 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -42,8 +42,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -60,8 +58,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class RepartitionWithMergeOptimizingTest { - private static final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class); - private static final String INPUT_A_TOPIC = "inputA"; private static final String INPUT_B_TOPIC = "inputB"; private static final String COUNT_TOPIC = "outputTopic_0"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 768f3787d0b..6be27187f0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -178,7 +178,7 @@ public class StandbyTaskTest { } @Test - public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException { + public void shouldThrowLockExceptionIfFailedToLockStateDirectory() { stateDirectory = mock(StateDirectory.class); when(stateDirectory.lock(taskId)).thenReturn(false); when(stateManager.taskType()).thenReturn(TaskType.STANDBY); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 8e7ae80af35..f850093d389 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -41,8 +41,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; @@ -95,7 +93,6 @@ import static org.junit.jupiter.api.Assertions.fail; public class StateDirectoryTest { - private static final Logger log = LoggerFactory.getLogger(StateDirectoryTest.class); private final MockTime time = new MockTime(); private File stateDir; private final String applicationId = "applicationId"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java index 4bb8ca5b2da..5d0acf46bd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -184,7 +184,7 @@ public class StateManagerUtilTest { doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); when(stateManager.baseDir()).thenReturn(randomFile); - try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + try (MockedStatic<Utils> ignored = mockStatic(Utils.class)) { assertThrows(ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 320494c5348..69655b4d642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -389,7 +389,7 @@ public class StoreChangelogReaderTest { adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { throw new TimeoutException("KABOOM!"); @@ -674,7 +674,7 @@ public class StoreChangelogReaderTest { when(activeStateManager.taskId()).thenReturn(taskId); final AtomicBoolean clearException = new AtomicBoolean(false); - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { if (clearException.get()) { @@ -720,7 +720,7 @@ public class StoreChangelogReaderTest { when(activeStateManager.taskId()).thenReturn(taskId); when(storeMetadata.offset()).thenReturn(10L); - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { throw kaboom; @@ -770,7 +770,7 @@ public class StoreChangelogReaderTest { }; adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) { throw new AssertionError("Should not trigger this function"); @@ -928,7 +928,7 @@ public class StoreChangelogReaderTest { @Test public void shouldThrowIfUnsubscribeFail() { - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public void unsubscribe() { throw kaboom; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 98807cd6342..fda9afa9a88 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -766,7 +766,7 @@ public class StreamTaskTest { metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); // Create a processor that only forwards even keys to test the metrics at the source and terminal nodes - final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { + final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<>(intDeserializer, intDeserializer) { InternalProcessorContext<Integer, Integer> context; @Override @@ -2045,7 +2045,7 @@ public class StreamTaskTest { public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final Consumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) { throw new KafkaException("KABOOM!"); @@ -3109,7 +3109,7 @@ public class StreamTaskTest { singletonList(stateStore), emptyMap()); - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) { throw new TimeoutException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6fc8b4efb30..8cb2fc8cdfe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -2238,7 +2238,7 @@ public class StreamThreadTest { final List<Long> punctuatedStreamTime = new ArrayList<>(); final List<Long> punctuatedWallClockTime = new ArrayList<>(); final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor = - () -> new ContextualProcessor<Object, Object, Void, Void>() { + () -> new ContextualProcessor<>() { @Override public void init(final ProcessorContext<Void, Void> context) { context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add); @@ -2506,7 +2506,7 @@ public class StreamThreadTest { if (stateUpdaterEnabled) { TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().size() == 0, + () -> mockRestoreConsumer.assignment().isEmpty(), "Never get the assignment"); } else { TestUtils.waitForCondition( @@ -3444,7 +3444,7 @@ public class StreamThreadTest { @ParameterizedTest @MethodSource("data") - public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3560,7 +3560,7 @@ public class StreamThreadTest { @ParameterizedTest @MethodSource("data") - public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { final MockProducer<byte[], byte[]> producer = new MockProducer<>(); producer.setClientInstanceId(Uuid.randomUuid()); producer.injectTimeoutException(-1); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 29fa204a579..b34d1408c56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; @@ -2819,23 +2818,6 @@ public class StreamsPartitionAssignorTest { return changelogEndOffsets; } - private static Map<String, TopicDescription> getTopicDescriptionMap(final List<String> changelogTopics, - final List<List<TopicPartitionInfo>> topicPartitionInfos) { - if (changelogTopics.size() != topicPartitionInfos.size()) { - throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + - topicPartitionInfos.size() + " different topicPartitionInfo for the topics"); - } - final Map<String, TopicDescription> changeLogTopicDescriptions = new HashMap<>(); - for (int i = 0; i < changelogTopics.size(); i++) { - final String topic = changelogTopics.get(i); - final List<TopicPartitionInfo> topicPartitionInfo = topicPartitionInfos.get(i); - changeLogTopicDescriptions.put(topic, new TopicDescription(topic, false, topicPartitionInfo)); - } - - return changeLogTopicDescriptions; - } - - private static SubscriptionInfo getInfoForOlderVersion(final int version, final ProcessId processId, final Set<TaskId> prevTasks, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 6c6362ee8bc..1c084fa63e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -920,8 +920,8 @@ public class StreamsProducerTest { METADATA_WAIT_TIME; assertThat(eosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked)); final long closeStart = 1L; - final long clodeDelay = 1L; - when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + clodeDelay); + final long closeDelay = 1L; + when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + closeDelay); eosStreamsProducer.resetProducer(eosMockProducer); setProducerMetrics( eosMockProducer, @@ -937,7 +937,7 @@ public class StreamsProducerTest { assertThat( eosStreamsProducer.totalBlockedTime(), - closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01) + closeTo(2 * expectedTotalBlocked + closeDelay, 0.01) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index d8bb35c000a..68abf455a76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1678,7 +1678,7 @@ public class TaskManagerTest { ); assertEquals(exception, thrown); - assertEquals(statefulTask.id(), thrown.taskId().get()); + assertEquals(statefulTask.id(), thrown.taskId().orElseThrow()); } @Test @@ -2149,7 +2149,7 @@ public class TaskManagerTest { } @Test - public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { + public void shouldCloseActiveTasksWhenHandlingLostTasks() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 7c7047b7516..206a6cc796d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -523,7 +523,7 @@ public final class AssignmentTestUtils { static <V> Matcher<ClientState> hasProperty(final String propertyName, final Function<ClientState, V> propertyExtractor, final V propertyValue) { - return new BaseMatcher<ClientState>() { + return new BaseMatcher<>() { @Override public void describeTo(final Description description) { description.appendText(propertyName).appendText(":").appendValue(propertyValue); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 381ae972aec..ccc904b5e81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -826,7 +826,7 @@ public class RackAwareTaskAssignorTest { @ParameterizedTest @MethodSource("paramStoreType") - public void shouldThrowIfMissingCallcanEnableRackAwareAssignor(final boolean stateful, final String assignmentStrategy) { + public void shouldThrowIfMissingCallCanEnableRackAwareAssignor(final boolean stateful, final String assignmentStrategy) { setUp(stateful); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterForAllTopics(), @@ -1125,28 +1125,28 @@ public class RackAwareTaskAssignorTest { setUp(stateful); final int nodeSize = 50; final int tpSize = 60; - final int partionSize = 3; + final int partitionSize = 3; final int clientSize = 50; final int replicaCount = 3; final int maxCapacity = 3; final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = getTaskTopicPartitionMap( - tpSize, partionSize, false); + tpSize, partitionSize, false); final AssignmentConfigs assignorConfiguration = getRackAwareEnabledConfigWithStandby(replicaCount, assignmentStrategy); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getRandomCluster(nodeSize, tpSize, partionSize), + getRandomCluster(nodeSize, tpSize, partitionSize), taskTopicPartitionMap, - getTaskTopicPartitionMap(tpSize, partionSize, true), + getTaskTopicPartitionMap(tpSize, partitionSize, true), getTopologyGroupTaskMap(), getRandomProcessRacks(clientSize, nodeSize), - mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partionSize), + mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize), assignorConfiguration, time ); final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) taskTopicPartitionMap.keySet(); final SortedMap<ProcessId, ClientState> clientStateMap = getRandomClientState(clientSize, - tpSize, partionSize, maxCapacity, taskIds); + tpSize, partitionSize, maxCapacity, taskIds); final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create( assignorConfiguration, assignor); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java index a42f4396041..2aba4da020a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java @@ -544,13 +544,11 @@ public class TaskAssignmentUtilsTest { new TopicPartition(String.format("test-topic-%d", taskId.subtopology()), taskId.partition()), true, true, - () -> { - partitions.forEach(partition -> { - if (partition != null && rackIds != null) { - partition.annotateWithRackIds(rackIds); - } - }); - } + () -> partitions.forEach(partition -> { + if (partition != null && rackIds != null) { + partition.annotateWithRackIds(rackIds); + } + }) )); return mkEntry( taskId, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 936c9eb6a85..39d687f3cdc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -425,7 +425,7 @@ public class TaskAssignorConvergenceTest { testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 1); verifyValidAssignment(numStandbyReplicas, harness); - // min-cost rack aware assignor doesn't balance subtopolgy + // min-cost rack aware assignor doesn't balance subtopology if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { verifyBalancedAssignment(harness, skewThreshold); } @@ -463,7 +463,7 @@ public class TaskAssignorConvergenceTest { verifyValidAssignment(numStandbyReplicas, harness); - // min-cost rack aware assignor doesn't balance subtopolgy + // min-cost rack aware assignor doesn't balance subtopology if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { verifyBalancedAssignment(harness, skewThreshold); } @@ -520,7 +520,7 @@ public class TaskAssignorConvergenceTest { testForConvergence(harness, configs, 1); verifyValidAssignment(numStandbyReplicas, harness); - // min-cost rack aware assignor doesn't balance subtopolgy + // min-cost rack aware assignor doesn't balance subtopology if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { verifyBalancedAssignment(harness, skewThreshold); } @@ -540,7 +540,7 @@ public class TaskAssignorConvergenceTest { if (!harness.clientStates.isEmpty()) { testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas)); verifyValidAssignment(numStandbyReplicas, harness); - // min-cost rack aware assignor doesn't balance subtopolgy + // min-cost rack aware assignor doesn't balance subtopology if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { verifyBalancedAssignment(harness, skewThreshold); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index c6bf68e4df5..bdb9d028c54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -99,9 +99,7 @@ public class StreamsMetricsImplTest { private static final String TASK_ID1 = "test-task-1"; private static final String TASK_ID2 = "test-task-2"; private static final String NODE_ID1 = "test-node-1"; - private static final String NODE_ID2 = "test-node-2"; private static final String TOPIC_ID1 = "test-topic-1"; - private static final String TOPIC_ID2 = "test-topic-2"; private static final String METRIC_NAME1 = "test-metric1"; private static final String METRIC_NAME2 = "test-metric2"; private static final String THREAD_ID_TAG = "thread-id"; @@ -236,8 +234,7 @@ public class StreamsMetricsImplTest { return sensorKeys; } - private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics metrics, - final RecordingLevel recordingLevel) { + private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics metrics) { final ArgumentCaptor<String> sensorKey = ArgumentCaptor.forClass(String.class); when(metrics.getSensor(sensorKey.capture())).thenReturn(null); final Sensor[] parents = {}; @@ -253,7 +250,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -277,7 +274,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( @@ -311,7 +308,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( @@ -349,7 +346,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewStoreLevelSensorIfNoneExists() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel); + final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( @@ -477,10 +474,11 @@ public class StreamsMetricsImplTest { final MetricName metricName = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); - final Metrics metrics = new Metrics(metricConfig); - assertNull(metrics.metric(metricName)); - metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); - assertNotNull(metrics.metric(metricName)); + try (Metrics metrics = new Metrics(metricConfig)) { + assertNull(metrics.metric(metricName)); + metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); + assertNotNull(metrics.metric(metricName)); + } } @Test @@ -509,10 +507,11 @@ public class StreamsMetricsImplTest { final MetricName metricName = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); - final Metrics metrics = new Metrics(metricConfig); - assertNull(metrics.metric(metricName)); - final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); - assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)); + try (Metrics metrics = new Metrics(metricConfig)) { + assertNull(metrics.metric(metricName)); + final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); + assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)); + } } @Test @@ -520,20 +519,21 @@ public class StreamsMetricsImplTest { final MetricName metricName = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); - final Metrics metrics = new Metrics(metricConfig); - assertNull(metrics.metric(metricName)); - final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>(); - final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>(); + try (Metrics metrics = new Metrics(metricConfig)) { + assertNull(metrics.metric(metricName)); + final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>(); + final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>(); - final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); - final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); + final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); + final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); - thread1.start(); - thread2.start(); + thread1.start(); + thread2.start(); - thread1.join(); - thread2.join(); - assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get()); + thread1.join(); + thread2.join(); + assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get()); + } } @Test @@ -561,7 +561,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( @@ -598,7 +598,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( @@ -634,7 +634,7 @@ public class StreamsMetricsImplTest { public void shouldGetNewClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetNewSensorTest(metrics, recordingLevel); + setupGetNewSensorTest(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java index 7760d32e431..50964febf63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java @@ -529,7 +529,7 @@ public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytes } // the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in - // RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts. + // RocksDbWindowStore shouldn't have an implication and all stores should return the same fetched counts. assertEquals(1, fetchedCount); assertEquals( Set.of(segments.segmentName(3L), segments.segmentName(5L)), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 0db31dfe039..159503b920a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -879,13 +879,13 @@ public class CachingPersistentSessionStoreTest { public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> { private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDesializer; + private final Deserializer<V> valueDeserializer; private final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>(); CacheFlushListenerStub(final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDesializer) { + final Deserializer<V> valueDeserializer) { this.keyDeserializer = keyDeserializer; - this.valueDesializer = valueDesializer; + this.valueDeserializer = valueDeserializer; } @Override @@ -894,8 +894,8 @@ public class CachingPersistentSessionStoreTest { new KeyValueTimestamp<>( keyDeserializer.deserialize(null, record.key()), new Change<>( - valueDesializer.deserialize(null, record.value().newValue), - valueDesializer.deserialize(null, record.value().oldValue)), + valueDeserializer.deserialize(null, record.value().newValue), + valueDeserializer.deserialize(null, record.value().oldValue)), record.timestamp() ) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index 9c8fd2ce5f3..ae59fdbf214 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.GenericInMemoryKeyValueStore; @@ -27,6 +26,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.Iterator; import java.util.List; import static java.util.Arrays.asList; @@ -67,22 +67,12 @@ public class FilteredCacheIteratorTest { @BeforeEach public void before() { store.putAll(entries); - final HasNextCondition allCondition = new HasNextCondition() { - @Override - public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { - return iterator.hasNext(); - } - }; + final HasNextCondition allCondition = Iterator::hasNext; allIterator = new FilteredCacheIterator( new DelegatingPeekingKeyValueIterator<>("", store.all()), allCondition, IDENTITY_FUNCTION); - final HasNextCondition firstEntryCondition = new HasNextCondition() { - @Override - public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { - return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key); - } - }; + final HasNextCondition firstEntryCondition = iterator -> iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key); firstEntryIterator = new FilteredCacheIterator( new DelegatingPeekingKeyValueIterator<>("", store.all()), firstEntryCondition, IDENTITY_FUNCTION); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java index ec1814ebd9c..2a57c36659d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java @@ -42,10 +42,4 @@ public class MonotonicProcessorRecordContext extends ProcessorRecordContext { } return ret; } - - public void kick() { - if (!automatic) { - counter++; - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java index b583148ab47..670a980d9ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java @@ -44,8 +44,8 @@ public class Murmur3Test { cases.put(new byte[]{'a', 'b', 'c'}, 461137560); int seed = 123; - for (Map.Entry c : cases.entrySet()) { - byte[] b = (byte[]) c.getKey(); + for (Map.Entry<byte[], Integer> c : cases.entrySet()) { + byte[] b = c.getKey(); assertEquals(c.getValue(), Murmur3.hash32(b, b.length, seed)); } } @@ -62,10 +62,10 @@ public class Murmur3Test { int seed = 123; - for (Map.Entry c : cases.entrySet()) { - byte[] b = (byte[]) c.getKey(); + for (Map.Entry<byte[], long[]> c : cases.entrySet()) { + byte[] b = c.getKey(); long[] result = Murmur3.hash128(b, 0, b.length, seed); - assertArrayEquals((long[]) c.getValue(), result); + assertArrayEquals(c.getValue(), result); } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index c574950ac90..25d7fa3a68b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -110,7 +110,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } @@ -149,7 +149,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } @@ -191,7 +191,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } @@ -235,7 +235,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } @@ -286,7 +286,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } @@ -342,7 +342,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator(); - return new KeyValueIterator<Windowed<K>, V>() { + return new KeyValueIterator<>() { @Override public void close() { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 08248b02054..3ce8cb63efc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -81,7 +81,7 @@ import static org.mockito.Mockito.mockingDetails; @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { - private final List<String> walRelatedMethods = new LinkedList<String>() { + private final List<String> walRelatedMethods = new LinkedList<>() { { add("setManualWalFlush"); add("setMaxTotalWalSize"); @@ -94,7 +94,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { } }; - private final List<String> ignoreMethods = new LinkedList<String>() { + private final List<String> ignoreMethods = new LinkedList<>() { { add("isOwningHandle"); add("getNativeHandle"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java index bdcdd2d6d8c..08391465a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -88,8 +87,6 @@ public class WindowStoreFetchTest { private String innerLowBetween; private String innerHighBetween; - private TimeWindowedKStream<String, String> windowedStream; - public void setup(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index dce7d417c09..ab868b672d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -65,7 +65,7 @@ public class WrappingStoreProviderTest { @Test public void shouldFindKeyValueStores() { final List<ReadOnlyKeyValueStore<String, String>> results = - wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()); + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); assertEquals(2, results.size()); } @@ -95,7 +95,7 @@ public class WrappingStoreProviderTest { public void shouldReturnAllStoreWhenQueryWithoutPartition() { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore())); final List<ReadOnlyKeyValueStore<String, String>> results = - wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()); + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); assertEquals(numStateStorePartitions, results.size()); } @@ -103,7 +103,7 @@ public class WrappingStoreProviderTest { public void shouldReturnSingleStoreWhenQueryWithPartition() { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1)); final List<ReadOnlyKeyValueStore<String, String>> results = - wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()); + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); assertEquals(1, results.size()); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java index dc18c36fff6..4bca7f995a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java @@ -311,7 +311,7 @@ public class RelationalSmokeTest extends SmokeTestUtil { final long dataStartTime = System.currentTimeMillis() - timeSpan; final long dataEndTime = System.currentTimeMillis(); - // Explicitly create a seed so we can we can log. + // Explicitly create a seed so we can log. // If we are debugging a failed run, we can deterministically produce the same dataset // by plugging in the seed from that run. final long seed = new Random().nextLong(); @@ -368,7 +368,7 @@ public class RelationalSmokeTest extends SmokeTestUtil { * data distribution: Zipfian and Normal, while also being efficient to generate. */ private static Iterator<Integer> zipfNormal(final Random random, final int keySpace) { - return new Iterator<Integer>() { + return new Iterator<>() { @Override public boolean hasNext() { return true; @@ -829,13 +829,13 @@ public class RelationalSmokeTest extends SmokeTestUtil { pass, report, "Expected 1 article, got " + consumedArticles.size(), - consumedArticles.size() > 0 + !consumedArticles.isEmpty() ); assertThat( pass, report, "Expected 1 comment, got " + consumedComments.size(), - consumedComments.size() > 0 + !consumedComments.isEmpty() ); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java index 68ad90babb8..5f9d932e063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import java.time.Duration; @@ -49,11 +48,8 @@ public class ShutdownDeadlockTest { final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - source.foreach(new ForeachAction<String, String>() { - @Override - public void apply(final String key, final String value) { - throw new RuntimeException("KABOOM!"); - } + source.foreach((key, value) -> { + throw new RuntimeException("KABOOM!"); }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.setUncaughtExceptionHandler(e -> { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 5bc679245d7..9ca9db21a95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -174,7 +174,7 @@ public class SmokeTestClient extends SmokeTestUtil { final KTable<Windowed<String>, Integer> smallWindowSum = groupedData .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1))) - .reduce((l, r) -> l + r); + .reduce(Integer::sum); streamify(smallWindowSum, "sws-raw"); streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 835dae61ed1..7e670802b93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -39,7 +39,7 @@ public class SmokeTestUtil { } static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) { - return () -> new ContextualProcessor<Object, Object, Void, Void>() { + return () -> new ContextualProcessor<>() { private int numRecordsProcessed = 0; private long smallestOffset = Long.MAX_VALUE; private long largestOffset = Long.MIN_VALUE; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index c966b2de1fe..77224e23947 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -93,7 +93,7 @@ public class StreamsBrokerDownResilienceTest { final Serde<String> stringSerde = Serdes.String(); builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde)) - .peek(new ForeachAction<String, String>() { + .peek(new ForeachAction<>() { int messagesProcessed = 0; @Override public void apply(final String key, final String value) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index f7be9430d66..26da5bfeb1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -55,7 +55,7 @@ public class StreamsNamedRepartitionTest { final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic"))); final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic"))); - final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations"))); + final boolean addOperators = Boolean.parseBoolean(Objects.requireNonNull((String) streamsProperties.remove("add.operations"))); final Initializer<Integer> initializer = () -> 0; diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index 8b049a01ee4..5f19662c0f0 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -50,7 +50,7 @@ public class MockInternalTopicManager extends InternalTopicManager { public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { for (final InternalTopicConfig topic : topics.values()) { final String topicName = topic.name(); - final int numberOfPartitions = topic.numberOfPartitions().get(); + final int numberOfPartitions = topic.numberOfPartitions().orElseThrow(); readyTopics.put(topicName, numberOfPartitions); final List<PartitionInfo> partitions = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java index e5e0763bd2a..6c63b0adfdd 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java +++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java @@ -23,11 +23,6 @@ public class MockValueJoiner { public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = instance("+"); public static <V1, V2> ValueJoiner<V1, V2, String> instance(final String separator) { - return new ValueJoiner<V1, V2, String>() { - @Override - public String apply(final V1 value1, final V2 value2) { - return value1 + separator + value2; - } - }; + return (value1, value2) -> value1 + separator + value2; } } diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java b/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java index b948abffe27..9f773e7ba64 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpValueTransformerWithKeySupplier.java @@ -25,7 +25,7 @@ public class NoOpValueTransformerWithKeySupplier<K, V> implements ValueTransform @Override public ValueTransformerWithKey<K, V, V> get() { - return new ValueTransformerWithKey<K, V, V>() { + return new ValueTransformerWithKey<>() { @Override public void init(final ProcessorContext context1) { diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 928566fa779..d7c5936b7d5 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -103,7 +103,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V } final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.values().iterator(); return new KeyValueIteratorStub<>( - new Iterator<KeyValue<Windowed<K>, V>>() { + new Iterator<>() { Iterator<KeyValue<Windowed<K>, V>> it; @@ -155,7 +155,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.descendingMap().values().iterator(); return new KeyValueIteratorStub<>( - new Iterator<KeyValue<Windowed<K>, V>>() { + new Iterator<>() { Iterator<KeyValue<Windowed<K>, V>> it;