This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6224feee656 MINOR: update StandbyTaskCreationIntegrationTest (#16700)
6224feee656 is described below

commit 6224feee656303d66467647f4a71f0a3daa3b2b8
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:27:38 2024 -0700

    MINOR: update StandbyTaskCreationIntegrationTest (#16700)
    
    Refactor test to move off deprecated `transform()` in favor of
    `process()`.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../StandbyTaskCreationIntegrationTest.java        | 35 +++++++++-------------
 1 file changed, 14 insertions(+), 21 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index d2002f1a76e..aa08da6da26 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.ThreadMetadata;
@@ -27,8 +26,8 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -87,20 +86,19 @@ public class StandbyTaskCreationIntegrationTest {
         client2.close();
     }
 
-    private Properties streamsConfiguration(final TestInfo testInfo) {
+    private Properties streamsConfiguration() {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
         streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         return streamsConfiguration;
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo 
testInfo) throws Exception {
+    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws 
Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final String stateStoreName = "myTransformState";
         final StoreBuilder<KeyValueStore<Integer, Integer>> 
keyValueStoreBuilder =
@@ -109,21 +107,16 @@ public class StandbyTaskCreationIntegrationTest {
                                         
Serdes.Integer()).withLoggingDisabled();
         builder.addStateStore(keyValueStoreBuilder);
         builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
-            .transform(() -> new Transformer<Integer, Integer, 
KeyValue<Integer, Integer>>() {
+            // don't use method reference below as it won't create a new 
`Processor` instance but re-use the same object
+            .process(() -> new Processor<Integer, Integer, Object, Object>() {
                 @Override
-                public void init(final ProcessorContext context) {}
-
-                @Override
-                public KeyValue<Integer, Integer> transform(final Integer key, 
final Integer value) {
-                    return null;
+                public void process(final Record<Integer, Integer> record) {
+                    // no-op
                 }
-
-                @Override
-                public void close() {}
             }, stateStoreName);
 
         final Topology topology = builder.build();
-        createClients(topology, streamsConfiguration(testInfo), topology, 
streamsConfiguration(testInfo));
+        createClients(topology, streamsConfiguration(), topology, 
streamsConfiguration());
 
         setStateListenersForVerification(thread -> 
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
 
@@ -135,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest {
     }
 
     @Test
-    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo 
testInfo) throws Exception {
-        final Properties streamsConfiguration1 = 
streamsConfiguration(testInfo);
+    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws 
Exception {
+        final Properties streamsConfiguration1 = streamsConfiguration();
         streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        final Properties streamsConfiguration2 = 
streamsConfiguration(testInfo);
+        final Properties streamsConfiguration2 = streamsConfiguration();
         streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();

Reply via email to