This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d6a41ac3ca3 MINOR: update EosV2UpgradeIntegrationTest (#16698)
d6a41ac3ca3 is described below

commit d6a41ac3ca3bbd733c3385861306cf04f2661a22
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:26:25 2024 -0700

    MINOR: update EosV2UpgradeIntegrationTest (#16698)
    
    Refactor test to move off deprecated `transform()` in favor of
    `process()`.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../integration/EosV2UpgradeIntegrationTest.java   | 97 ++++++++++------------
 1 file changed, 46 insertions(+), 51 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index 087a439d084..8653f69fca6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -40,9 +40,9 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -847,7 +847,6 @@ public class EosV2UpgradeIntegrationTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     private KafkaStreams getKafkaStreams(final String appDir,
                                          final String processingGuarantee,
                                          final boolean injectError) {
@@ -861,62 +860,58 @@ public class EosV2UpgradeIntegrationTest {
         builder.addStateStore(storeBuilder);
 
         final KStream<Long, Long> input = 
builder.stream(MULTI_PARTITION_INPUT_TOPIC);
-        input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, 
Long>>() {
-            @Override
-            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
-                return new Transformer<Long, Long, KeyValue<Long, Long>>() {
-                    ProcessorContext context;
-                    KeyValueStore<Long, Long> state = null;
-                    AtomicBoolean crash;
-                    AtomicInteger sharedCommit;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        this.context = context;
-                        state = context.getStateStore(storeName);
-                        final String clientId = 
context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
-                        if (APP_DIR_1.equals(clientId)) {
-                            crash = errorInjectedClient1;
-                            sharedCommit = commitCounterClient1;
-                        } else {
-                            crash = errorInjectedClient2;
-                            sharedCommit = commitCounterClient2;
-                        }
+        input.process(() -> new Processor<Long, Long, Long, Long>() {
+                ProcessorContext<Long, Long> context;
+                KeyValueStore<Long, Long> state = null;
+                AtomicBoolean crash;
+                AtomicInteger sharedCommit;
+
+                @Override
+                public void init(final ProcessorContext<Long, Long> context) {
+                    this.context = context;
+                    state = context.getStateStore(storeName);
+                    final String clientId = 
context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
+                    if (APP_DIR_1.equals(clientId)) {
+                        crash = errorInjectedClient1;
+                        sharedCommit = commitCounterClient1;
+                    } else {
+                        crash = errorInjectedClient2;
+                        sharedCommit = commitCounterClient2;
                     }
+                }
 
-                    @Override
-                    public KeyValue<Long, Long> transform(final Long key, 
final Long value) {
-                        if ((value + 1) % 10 == 0) {
-                            if (sharedCommit.get() < 0 ||
-                                sharedCommit.incrementAndGet() == 2) {
+                @Override
+                public void process(final Record<Long, Long> record) {
+                    final long key = record.key();
+                    final long value = record.value();
 
-                                context.commit();
-                            }
-                            commitRequested.incrementAndGet();
-                        }
+                    if ((value + 1) % 10 == 0) {
+                        if (sharedCommit.get() < 0 ||
+                            sharedCommit.incrementAndGet() == 2) {
 
-                        Long sum = state.get(key);
-                        if (sum == null) {
-                            sum = value;
-                        } else {
-                            sum += value;
+                            context.commit();
                         }
-                        state.put(key, sum);
-                        state.flush();
+                        commitRequested.incrementAndGet();
+                    }
 
-                        if (value % 10 == 4 && // potentially crash when 
processing 5th, 15th, or 25th record (etc.)
-                            crash != null && crash.compareAndSet(true, false)) 
{
-                            // only crash a single task
-                            throw new RuntimeException("Injected test 
exception.");
-                        }
+                    Long sum = state.get(key);
+                    if (sum == null) {
+                        sum = value;
+                    } else {
+                        sum += value;
+                    }
+                    state.put(key, sum);
+                    state.flush();
 
-                        return new KeyValue<>(key, state.get(key));
+                    if (value % 10 == 4 && // potentially crash when 
processing 5th, 15th, or 25th record (etc.)
+                        crash != null && crash.compareAndSet(true, false)) {
+                        // only crash a single task
+                        throw new RuntimeException("Injected test exception.");
                     }
 
-                    @Override
-                    public void close() {}
-                };
-            } }, storeNames)
+                    context.forward(record.withValue(state.get(key)));
+                }
+            }, storeNames)
             .to(MULTI_PARTITION_OUTPUT_TOPIC);
 
         final Properties properties = new Properties();

Reply via email to