This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd824a2e748 KAFKA-19666: Remove old restoration codepath from 
RestoreIntegrationTest [4/N] (#20498)
dd824a2e748 is described below

commit dd824a2e748d7a61f02d3647a6606b8395b9220d
Author: Shashank <hsshashank.g...@gmail.com>
AuthorDate: Thu Sep 11 07:06:25 2025 -0700

    KAFKA-19666: Remove old restoration codepath from RestoreIntegrationTest 
[4/N] (#20498)
    
    Clean up `RestoreIntegrationTest.java`
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../integration/RestoreIntegrationTest.java        | 74 ++++++----------------
 1 file changed, 21 insertions(+), 53 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 7370d488757..e85ac344157 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -77,7 +76,6 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,8 +159,8 @@ public class RestoreIntegrationTest {
         CLUSTER.createTopic(inputStream, 2, 1);
     }
 
-    private Properties props(final boolean stateUpdaterEnabled) {
-        return 
props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled))));
+    private Properties props() {
+        return props(mkObjectProperties(mkMap()));
     }
 
     private Properties props(final Properties extraProperties) {
@@ -267,17 +265,12 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false", 
-        "false,true",
-        "false,false"
-    })
-    public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final 
boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+    @ValueSource(booleans = {true, false})
+    public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final 
boolean useNewProtocol) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final Topology topology = new Topology();
 
-        final Properties props = props(stateUpdaterEnabled);
+        final Properties props = props();
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
@@ -338,17 +331,12 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false",
-        "false,true",
-        "false,false"
-    })
-    public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean 
stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+    @ValueSource(booleans = {true, false})
+    public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean 
useNewProtocol) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Properties props = props(stateUpdaterEnabled);
+        final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
@@ -413,20 +401,15 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false",
-        "false,true",
-        "false,false"
-    })
-    public void shouldRestoreStateFromChangelogTopic(final boolean 
stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+    @ValueSource(booleans = {true, false})
+    public void shouldRestoreStateFromChangelogTopic(final boolean 
useNewProtocol) throws Exception {
         final String changelog = appId + "-store-changelog";
         CLUSTER.createTopic(changelog, 2, 1);
 
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Properties props = props(stateUpdaterEnabled);
+        final Properties props = props();
 
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
@@ -474,13 +457,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false",
-        "false,true",
-        "false,false"
-    })
-    public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean 
stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
+    @ValueSource(booleans = {true, false})
+    public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean 
useNewProtocol) throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -490,7 +468,7 @@ public class RestoreIntegrationTest {
                 Integer::sum,
                 Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as("reduce-store").withLoggingDisabled()
             );
-        final Properties props = props(stateUpdaterEnabled);
+        final Properties props = props();
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
@@ -503,13 +481,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false",
-        "false,true",
-        "false,false"
-    })
-    public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean 
stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
+    @ValueSource(booleans = {true, false})
+    public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean 
useNewProtocol) throws InterruptedException {
         IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
                 asList(KeyValue.pair(1, 1),
                         KeyValue.pair(2, 2),
@@ -537,7 +510,7 @@ public class RestoreIntegrationTest {
 
         final Topology topology = streamsBuilder.build();
 
-        final Properties props = props(stateUpdaterEnabled);
+        final Properties props = props();
 
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
@@ -558,13 +531,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({
-        "true,true",
-        "true,false",
-        "false,true",
-        "false,false"
-    })
-    public void 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final 
boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+    @ValueSource(booleans = {true, false})
+    public void 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final 
boolean useNewProtocol) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(
                 inputStream,
@@ -576,7 +544,7 @@ public class RestoreIntegrationTest {
             CLUSTER.setGroupStandbyReplicas(appId, 1);
         }
 
-        final Properties props1 = props(stateUpdaterEnabled);
+        final Properties props1 = props();
         props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         props1.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId + "-1").getPath());
         if (useNewProtocol) {
@@ -585,7 +553,7 @@ public class RestoreIntegrationTest {
         purgeLocalStreamsState(props1);
         final KafkaStreams streams1 = new KafkaStreams(builder.build(), 
props1);
 
-        final Properties props2 = props(stateUpdaterEnabled);
+        final Properties props2 = props();
         props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         props2.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId + "-2").getPath());
         if (useNewProtocol) {

Reply via email to