This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06392f7ae27 MINOR: Update of the PAPI testing classes to the latest
implementation (#12740)
06392f7ae27 is described below
commit 06392f7ae272af33e6789ce48fc840005fa24899
Author: Daan Gerits <[email protected]>
AuthorDate: Fri Feb 23 03:15:24 2024 +0100
MINOR: Update of the PAPI testing classes to the latest implementation
(#12740)
Reviewers: Matthias J. Sax <[email protected]>
---
.../kstream/internals/KStreamBranchTest.java | 4 +--
.../kstream/internals/KStreamTransformTest.java | 4 +--
.../internals/KStreamTransformValuesTest.java | 2 +-
.../internals/KTableTransformValuesTest.java | 2 +-
.../java/org/apache/kafka/test/MockProcessor.java | 33 ++++++++++++----------
.../org/apache/kafka/test/MockProcessorNode.java | 15 +++++-----
.../apache/kafka/test/MockProcessorSupplier.java | 13 +++++----
7 files changed, 38 insertions(+), 35 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 8a731f9e474..d1b2477fe69 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -59,7 +59,7 @@ public class KStreamBranchTest {
assertEquals(3, branches.length);
- final MockProcessorSupplier<Integer, String> supplier = new
MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String, Void, Void> supplier =
new MockProcessorSupplier<>();
for (final KStream<Integer, String> branch : branches) {
branch.process(supplier);
}
@@ -71,7 +71,7 @@ public class KStreamBranchTest {
}
}
- final List<MockProcessor<Integer, String>> processors =
supplier.capturedProcessors(3);
+ final List<MockProcessor<Integer, String, Void, Void>> processors =
supplier.capturedProcessors(3);
assertEquals(3, processors.get(0).processed().size());
assertEquals(1, processors.get(1).processed().size());
assertEquals(2, processors.get(2).processed().size());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 5aad9f07fd1..00b9d10fc98 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -74,7 +74,7 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
- final MockProcessorSupplier<Integer, Integer> processor = new
MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, Integer, Void, Void> processor =
new MockProcessorSupplier<>();
final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);
@@ -136,7 +136,7 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
- final MockProcessorSupplier<Integer, Integer> processor = new
MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, Integer, Void, Void> processor =
new MockProcessorSupplier<>();
final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 05c07b00dbb..0063c655449 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertArrayEquals;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KStreamTransformValuesTest {
private final String topicName = "topic";
- private final MockProcessorSupplier<Integer, Integer> supplier = new
MockProcessorSupplier<>();
+ private final MockProcessorSupplier<Integer, Integer, Void, Void> supplier
= new MockProcessorSupplier<>();
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
@Mock
private InternalProcessorContext context;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 0bba2e40672..3138c71cf8b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -84,7 +84,7 @@ public class KTableTransformValuesTest {
private static final Consumed<String, String> CONSUMED =
Consumed.with(Serdes.String(), Serdes.String());
private TopologyTestDriver driver;
- private MockProcessorSupplier<String, String> capture;
+ private MockProcessorSupplier<String, String, Void, Void> capture;
private StreamsBuilder builder;
@Mock
private KTableImpl<String, String, String> parent;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index dc2a38938a8..9766d1c0fe8 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -18,8 +18,9 @@ package org.apache.kafka.test;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -28,9 +29,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessor<K, V> extends
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
- private final MockApiProcessor<K, V, Object, Object> delegate;
+public class MockProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn,
VIn, KOut, VOut> {
+ private final MockApiProcessor<KIn, VIn, KOut, VOut> delegate;
+
public MockProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
@@ -41,16 +42,14 @@ public class MockProcessor<K, V> extends
org.apache.kafka.streams.processor.Abst
delegate = new MockApiProcessor<>();
}
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- super.init(context);
-
delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object,
Object>) context);
+ public void init(ProcessorContext<KOut, VOut> context) {
+ delegate.init(context);
}
@Override
- public void process(final K key, final V value) {
- delegate.process(new Record<>(key, value, context.timestamp(),
context.headers()));
+ public void process(Record<KIn, VIn> record) {
+ delegate.process(record);
}
public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>...
expected) {
@@ -69,7 +68,7 @@ public class MockProcessor<K, V> extends
org.apache.kafka.streams.processor.Abst
delegate.checkAndClearPunctuateResult(type, expected);
}
- public Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey() {
+ public Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey() {
return delegate.lastValueAndTimestampPerKey();
}
@@ -81,14 +80,18 @@ public class MockProcessor<K, V> extends
org.apache.kafka.streams.processor.Abst
return delegate.scheduleCancellable();
}
- public ArrayList<KeyValueTimestamp<K, V>> processed() {
+ public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() {
return delegate.processed();
}
- @SuppressWarnings("unchecked")
public void addProcessorMetadata(final String key, final long value) {
- if (context instanceof InternalProcessorContext) {
- ((InternalProcessorContext<K, V>)
context).addProcessorMetadataKeyValue(key, value);
+ if (delegate.context() instanceof InternalProcessorContext) {
+ ((InternalProcessorContext<KOut, VOut>)
delegate.context()).addProcessorMetadataKeyValue(key, value);
}
}
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 039c831e4df..908641a0e64 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -16,20 +16,20 @@
*/
package org.apache.kafka.test;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class MockProcessorNode<KIn, VIn, KOut, VOut> extends
ProcessorNode<KIn, VIn, KOut, VOut> {
private static final String NAME = "MOCK-PROCESS-";
private static final AtomicInteger INDEX = new AtomicInteger(1);
- public final MockProcessor<KIn, VIn> mockProcessor;
+ public final MockProcessor<KIn, VIn, KOut, VOut> mockProcessor;
public boolean closed;
public boolean initialized;
@@ -46,9 +46,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends
ProcessorNode<KIn,
this(new MockProcessor<>());
}
- @SuppressWarnings("unchecked")
- private MockProcessorNode(final MockProcessor<KIn, VIn> mockProcessor) {
- super(NAME + INDEX.getAndIncrement(),
ProcessorAdapter.adapt(mockProcessor), Collections.<String>emptySet());
+ private MockProcessorNode(final MockProcessor<KIn, VIn, KOut, VOut>
mockProcessor) {
+ super(NAME + INDEX.getAndIncrement(), mockProcessor,
Collections.<String>emptySet());
this.mockProcessor = mockProcessor;
}
@@ -61,7 +60,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends
ProcessorNode<KIn,
@Override
public void process(final Record<KIn, VIn> record) {
- mockProcessor.process(record.key(), record.value());
+ mockProcessor.process(record);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index c6b70f2763d..deab0ac33c4 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
import java.util.ArrayList;
import java.util.List;
@@ -24,11 +25,11 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessorSupplier<K, V> implements
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+public class MockProcessorSupplier<KIn, VIn, KOut, VOut> implements
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> {
private final long scheduleInterval;
private final PunctuationType punctuationType;
- private final List<MockProcessor<K, V>> processors = new ArrayList<>();
+ private final List<MockProcessor<KIn, VIn, KOut, VOut>> processors = new
ArrayList<>();
public MockProcessorSupplier() {
this(-1L);
@@ -44,8 +45,8 @@ public class MockProcessorSupplier<K, V> implements
org.apache.kafka.streams.pro
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, V> get() {
- final MockProcessor<K, V> processor = new
MockProcessor<>(punctuationType, scheduleInterval);
+ public Processor<KIn, VIn, KOut, VOut> get() {
+ final MockProcessor<KIn, VIn, KOut, VOut> processor = new
MockProcessor<>(punctuationType, scheduleInterval);
// to keep tests simple, ignore calls from ApiUtils.checkSupplier
if (!StreamsTestUtils.isCheckSupplierCall()) {
@@ -56,7 +57,7 @@ public class MockProcessorSupplier<K, V> implements
org.apache.kafka.streams.pro
}
// get the captured processor assuming that only one processor gets
returned from this supplier
- public MockProcessor<K, V> theCapturedProcessor() {
+ public MockProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
return capturedProcessors(1).get(0);
}
@@ -65,7 +66,7 @@ public class MockProcessorSupplier<K, V> implements
org.apache.kafka.streams.pro
}
// get the captured processors with the expected number
- public List<MockProcessor<K, V>> capturedProcessors(final int
expectedNumberOfProcessors) {
+ public List<MockProcessor<KIn, VIn, KOut, VOut>> capturedProcessors(final
int expectedNumberOfProcessors) {
assertEquals(expectedNumberOfProcessors, processors.size());
return processors;