This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06392f7ae27 MINOR: Update of the PAPI testing classes to the latest 
implementation (#12740)
06392f7ae27 is described below

commit 06392f7ae272af33e6789ce48fc840005fa24899
Author: Daan Gerits <[email protected]>
AuthorDate: Fri Feb 23 03:15:24 2024 +0100

    MINOR: Update of the PAPI testing classes to the latest implementation 
(#12740)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kstream/internals/KStreamBranchTest.java       |  4 +--
 .../kstream/internals/KStreamTransformTest.java    |  4 +--
 .../internals/KStreamTransformValuesTest.java      |  2 +-
 .../internals/KTableTransformValuesTest.java       |  2 +-
 .../java/org/apache/kafka/test/MockProcessor.java  | 33 ++++++++++++----------
 .../org/apache/kafka/test/MockProcessorNode.java   | 15 +++++-----
 .../apache/kafka/test/MockProcessorSupplier.java   | 13 +++++----
 7 files changed, 38 insertions(+), 35 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 8a731f9e474..d1b2477fe69 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -59,7 +59,7 @@ public class KStreamBranchTest {
 
         assertEquals(3, branches.length);
 
-        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockProcessorSupplier<>();
         for (final KStream<Integer, String> branch : branches) {
             branch.process(supplier);
         }
@@ -71,7 +71,7 @@ public class KStreamBranchTest {
             }
         }
 
-        final List<MockProcessor<Integer, String>> processors = 
supplier.capturedProcessors(3);
+        final List<MockProcessor<Integer, String, Void, Void>> processors = 
supplier.capturedProcessors(3);
         assertEquals(3, processors.get(0).processed().size());
         assertEquals(1, processors.get(1).processed().size());
         assertEquals(2, processors.get(2).processed().size());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 5aad9f07fd1..00b9d10fc98 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -74,7 +74,7 @@ public class KStreamTransformTest {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        final MockProcessorSupplier<Integer, Integer> processor = new 
MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, Integer, Void, Void> processor = 
new MockProcessorSupplier<>();
         final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
@@ -136,7 +136,7 @@ public class KStreamTransformTest {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        final MockProcessorSupplier<Integer, Integer> processor = new 
MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, Integer, Void, Void> processor = 
new MockProcessorSupplier<>();
         final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 05c07b00dbb..0063c655449 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertArrayEquals;
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class KStreamTransformValuesTest {
     private final String topicName = "topic";
-    private final MockProcessorSupplier<Integer, Integer> supplier = new 
MockProcessorSupplier<>();
+    private final MockProcessorSupplier<Integer, Integer, Void, Void> supplier 
= new MockProcessorSupplier<>();
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
     @Mock
     private InternalProcessorContext context;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 0bba2e40672..3138c71cf8b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -84,7 +84,7 @@ public class KTableTransformValuesTest {
     private static final Consumed<String, String> CONSUMED = 
Consumed.with(Serdes.String(), Serdes.String());
 
     private TopologyTestDriver driver;
-    private MockProcessorSupplier<String, String> capture;
+    private MockProcessorSupplier<String, String, Void, Void> capture;
     private StreamsBuilder builder;
     @Mock
     private KTableImpl<String, String, String> parent;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index dc2a38938a8..9766d1c0fe8 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -18,8 +18,9 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -28,9 +29,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessor<K, V> extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-    private final MockApiProcessor<K, V, Object, Object> delegate;
+public class MockProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, 
VIn, KOut, VOut> {
+    private final MockApiProcessor<KIn, VIn, KOut, VOut> delegate;
+
 
     public MockProcessor(final PunctuationType punctuationType,
                          final long scheduleInterval) {
@@ -41,16 +42,14 @@ public class MockProcessor<K, V> extends 
org.apache.kafka.streams.processor.Abst
         delegate = new MockApiProcessor<>();
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public void init(final ProcessorContext context) {
-        super.init(context);
-        
delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, 
Object>) context);
+    public void init(ProcessorContext<KOut, VOut> context) {
+        delegate.init(context);
     }
 
     @Override
-    public void process(final K key, final V value) {
-        delegate.process(new Record<>(key, value, context.timestamp(), 
context.headers()));
+    public void process(Record<KIn, VIn> record) {
+        delegate.process(record);
     }
 
     public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... 
expected) {
@@ -69,7 +68,7 @@ public class MockProcessor<K, V> extends 
org.apache.kafka.streams.processor.Abst
         delegate.checkAndClearPunctuateResult(type, expected);
     }
 
-    public Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey() {
+    public Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey() {
         return delegate.lastValueAndTimestampPerKey();
     }
 
@@ -81,14 +80,18 @@ public class MockProcessor<K, V> extends 
org.apache.kafka.streams.processor.Abst
         return delegate.scheduleCancellable();
     }
 
-    public ArrayList<KeyValueTimestamp<K, V>> processed() {
+    public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() {
         return delegate.processed();
     }
 
-    @SuppressWarnings("unchecked")
     public void addProcessorMetadata(final String key, final long value) {
-        if (context instanceof InternalProcessorContext) {
-            ((InternalProcessorContext<K, V>) 
context).addProcessorMetadataKeyValue(key, value);
+        if (delegate.context() instanceof InternalProcessorContext) {
+            ((InternalProcessorContext<KOut, VOut>) 
delegate.context()).addProcessorMetadataKeyValue(key, value);
         }
     }
+
+    @Override
+    public void close() {
+        delegate.close();
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 039c831e4df..908641a0e64 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -16,20 +16,20 @@
  */
 package org.apache.kafka.test;
 
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class MockProcessorNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private static final String NAME = "MOCK-PROCESS-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
 
-    public final MockProcessor<KIn, VIn> mockProcessor;
+    public final MockProcessor<KIn, VIn, KOut, VOut> mockProcessor;
 
     public boolean closed;
     public boolean initialized;
@@ -46,9 +46,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn,
         this(new MockProcessor<>());
     }
 
-    @SuppressWarnings("unchecked")
-    private MockProcessorNode(final MockProcessor<KIn, VIn> mockProcessor) {
-        super(NAME + INDEX.getAndIncrement(), 
ProcessorAdapter.adapt(mockProcessor), Collections.<String>emptySet());
+    private MockProcessorNode(final MockProcessor<KIn, VIn, KOut, VOut> 
mockProcessor) {
+        super(NAME + INDEX.getAndIncrement(), mockProcessor, 
Collections.<String>emptySet());
 
         this.mockProcessor = mockProcessor;
     }
@@ -61,7 +60,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn,
 
     @Override
     public void process(final Record<KIn, VIn> record) {
-        mockProcessor.process(record.key(), record.value());
+        mockProcessor.process(record);
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index c6b70f2763d..deab0ac33c4 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -24,11 +25,11 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessorSupplier<K, V> implements 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+public class MockProcessorSupplier<KIn, VIn, KOut, VOut> implements 
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> {
 
     private final long scheduleInterval;
     private final PunctuationType punctuationType;
-    private final List<MockProcessor<K, V>> processors = new ArrayList<>();
+    private final List<MockProcessor<KIn, VIn, KOut, VOut>> processors = new 
ArrayList<>();
 
     public MockProcessorSupplier() {
         this(-1L);
@@ -44,8 +45,8 @@ public class MockProcessorSupplier<K, V> implements 
org.apache.kafka.streams.pro
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
-        final MockProcessor<K, V> processor = new 
MockProcessor<>(punctuationType, scheduleInterval);
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        final MockProcessor<KIn, VIn, KOut, VOut> processor = new 
MockProcessor<>(punctuationType, scheduleInterval);
 
         // to keep tests simple, ignore calls from ApiUtils.checkSupplier
         if (!StreamsTestUtils.isCheckSupplierCall()) {
@@ -56,7 +57,7 @@ public class MockProcessorSupplier<K, V> implements 
org.apache.kafka.streams.pro
     }
 
     // get the captured processor assuming that only one processor gets 
returned from this supplier
-    public MockProcessor<K, V> theCapturedProcessor() {
+    public MockProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
         return capturedProcessors(1).get(0);
     }
 
@@ -65,7 +66,7 @@ public class MockProcessorSupplier<K, V> implements 
org.apache.kafka.streams.pro
     }
 
         // get the captured processors with the expected number
-    public List<MockProcessor<K, V>> capturedProcessors(final int 
expectedNumberOfProcessors) {
+    public List<MockProcessor<KIn, VIn, KOut, VOut>> capturedProcessors(final 
int expectedNumberOfProcessors) {
         assertEquals(expectedNumberOfProcessors, processors.size());
 
         return processors;

Reply via email to