This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f2679bebf6 MINOR: update SuppressionDurabilityIntegrationTest (#16740)
8f2679bebf6 is described below
commit 8f2679bebf6d5cb59065d2ab9b9d5604c72175b3
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:28:09 2024 -0700
MINOR: update SuppressionDurabilityIntegrationTest (#16740)
Refactor test to move off deprecated `transform()` in favor of `process()`.
Reviewers: Bill Bejeck <[email protected]>
---
.../SuppressionDurabilityIntegrationTest.java | 33 +++++++++-------------
1 file changed, 14 insertions(+), 19 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 497b98ae354..37655aebce7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -27,7 +27,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -38,9 +37,10 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.TestUtils;
@@ -141,12 +141,12 @@ public class SuppressionDurabilityIntegrationTest {
final MetadataValidator metadataValidator = new
MetadataValidator(input);
suppressedCounts
- .transform(metadataValidator)
+ .process(metadataValidator)
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
- .transform(metadataValidator)
+ .process(metadataValidator)
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
final Properties streamsConfig = mkProperties(mkMap(
@@ -252,7 +252,7 @@ public class SuppressionDurabilityIntegrationTest {
}
}
- private static final class MetadataValidator implements
TransformerSupplier<String, Long, KeyValue<String, Long>> {
+ private static final class MetadataValidator implements
ProcessorSupplier<String, Long, String, Long> {
private static final Logger LOG =
LoggerFactory.getLogger(MetadataValidator.class);
private final AtomicReference<Throwable> firstException = new
AtomicReference<>();
private final String topic;
@@ -262,29 +262,24 @@ public class SuppressionDurabilityIntegrationTest {
}
@Override
- public Transformer<String, Long, KeyValue<String, Long>> get() {
- return new Transformer<String, Long, KeyValue<String, Long>>() {
- private ProcessorContext context;
+ public Processor<String, Long, String, Long> get() {
+ return new Processor<String, Long, String, Long>() {
+ private ProcessorContext<String, Long> context;
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<String, Long> context)
{
this.context = context;
}
@Override
- public KeyValue<String, Long> transform(final String key,
final Long value) {
+ public void process(final Record<String, Long> record) {
try {
- assertThat(context.topic(), equalTo(topic));
+ assertThat(context.recordMetadata().get().topic(),
equalTo(topic));
} catch (final Throwable e) {
firstException.compareAndSet(null, e);
LOG.error("Validation Failed", e);
}
- return new KeyValue<>(key, value);
- }
-
- @Override
- public void close() {
-
+ context.forward(record);
}
};
}