This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dd824a2e748 KAFKA-19666: Remove old restoration codepath from RestoreIntegrationTest [4/N] (#20498) dd824a2e748 is described below commit dd824a2e748d7a61f02d3647a6606b8395b9220d Author: Shashank <hsshashank.g...@gmail.com> AuthorDate: Thu Sep 11 07:06:25 2025 -0700 KAFKA-19666: Remove old restoration codepath from RestoreIntegrationTest [4/N] (#20498) Clean up `RestoreIntegrationTest.java` Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../integration/RestoreIntegrationTest.java | 74 ++++++---------------- 1 file changed, 21 insertions(+), 53 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 7370d488757..e85ac344157 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -43,7 +43,6 @@ import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -77,7 +76,6 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,8 +159,8 @@ public class RestoreIntegrationTest { CLUSTER.createTopic(inputStream, 2, 1); } - private Properties props(final boolean stateUpdaterEnabled) { - return props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled)))); + private Properties props() { + return props(mkObjectProperties(mkMap())); } private Properties props(final Properties extraProperties) { @@ -267,17 +265,12 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useNewProtocol) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final Topology topology = new Topology(); - final Properties props = props(stateUpdaterEnabled); + final Properties props = props(); if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } @@ -338,17 +331,12 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNewProtocol) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); - final Properties props = props(stateUpdaterEnabled); + final Properties props = props(); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); @@ -413,20 +401,15 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol) throws Exception { final String changelog = appId + "-store-changelog"; CLUSTER.createTopic(changelog, 2, 1); final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); - final Properties props = props(stateUpdaterEnabled); + final Properties props = props(); if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); @@ -474,13 +457,8 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProtocol) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); final KStream<Integer, Integer> stream = builder.stream(inputStream); @@ -490,7 +468,7 @@ public class RestoreIntegrationTest { Integer::sum, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled() ); - final Properties props = props(stateUpdaterEnabled); + final Properties props = props(); if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } @@ -503,13 +481,8 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewProtocol) throws InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), @@ -537,7 +510,7 @@ public class RestoreIntegrationTest { final Topology topology = streamsBuilder.build(); - final Properties props = props(stateUpdaterEnabled); + final Properties props = props(); if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); @@ -558,13 +531,8 @@ public class RestoreIntegrationTest { } @ParameterizedTest - @CsvSource({ - "true,true", - "true,false", - "false,true", - "false,false" - }) - public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean useNewProtocol) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); builder.table( inputStream, @@ -576,7 +544,7 @@ public class RestoreIntegrationTest { CLUSTER.setGroupStandbyReplicas(appId, 1); } - final Properties props1 = props(stateUpdaterEnabled); + final Properties props1 = props(); props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath()); if (useNewProtocol) { @@ -585,7 +553,7 @@ public class RestoreIntegrationTest { purgeLocalStreamsState(props1); final KafkaStreams streams1 = new KafkaStreams(builder.build(), props1); - final Properties props2 = props(stateUpdaterEnabled); + final Properties props2 = props(); props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath()); if (useNewProtocol) {