This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f2679bebf6 MINOR: update SuppressionDurabilityIntegrationTest (#16740)
8f2679bebf6 is described below

commit 8f2679bebf6d5cb59065d2ab9b9d5604c72175b3
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:28:09 2024 -0700

    MINOR: update SuppressionDurabilityIntegrationTest (#16740)
    
    Refactor test to move off deprecated `transform()` in favor of `process()`.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../SuppressionDurabilityIntegrationTest.java      | 33 +++++++++-------------
 1 file changed, 14 insertions(+), 19 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 497b98ae354..37655aebce7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -27,7 +27,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -38,9 +37,10 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.TestUtils;
 
@@ -141,12 +141,12 @@ public class SuppressionDurabilityIntegrationTest {
         final MetadataValidator metadataValidator = new 
MetadataValidator(input);
 
         suppressedCounts
-            .transform(metadataValidator)
+            .process(metadataValidator)
             .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
 
         valueCounts
             .toStream()
-            .transform(metadataValidator)
+            .process(metadataValidator)
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
         final Properties streamsConfig = mkProperties(mkMap(
@@ -252,7 +252,7 @@ public class SuppressionDurabilityIntegrationTest {
         }
     }
 
-    private static final class MetadataValidator implements 
TransformerSupplier<String, Long, KeyValue<String, Long>> {
+    private static final class MetadataValidator implements 
ProcessorSupplier<String, Long, String, Long> {
         private static final Logger LOG = 
LoggerFactory.getLogger(MetadataValidator.class);
         private final AtomicReference<Throwable> firstException = new 
AtomicReference<>();
         private final String topic;
@@ -262,29 +262,24 @@ public class SuppressionDurabilityIntegrationTest {
         }
 
         @Override
-        public Transformer<String, Long, KeyValue<String, Long>> get() {
-            return new Transformer<String, Long, KeyValue<String, Long>>() {
-                private ProcessorContext context;
+        public Processor<String, Long, String, Long> get() {
+            return new Processor<String, Long, String, Long>() {
+                private ProcessorContext<String, Long> context;
 
                 @Override
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<String, Long> context) 
{
                     this.context = context;
                 }
 
                 @Override
-                public KeyValue<String, Long> transform(final String key, 
final Long value) {
+                public void process(final Record<String, Long> record) {
                     try {
-                        assertThat(context.topic(), equalTo(topic));
+                        assertThat(context.recordMetadata().get().topic(), 
equalTo(topic));
                     } catch (final Throwable e) {
                         firstException.compareAndSet(null, e);
                         LOG.error("Validation Failed", e);
                     }
-                    return new KeyValue<>(key, value);
-                }
-
-                @Override
-                public void close() {
-
+                    context.forward(record);
                 }
             };
         }

Reply via email to