This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1528264f021 MINOR: update EosIntegrationTest (#16697)
1528264f021 is described below
commit 1528264f0217820a86f01964219dbe11b3a60c0e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:25:44 2024 -0700
MINOR: update EosIntegrationTest (#16697)
Refactor test to move off deprecated `transform()` in favor of
`process()`.
Reviewers: Bill Bejeck <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 125 ++++++++++-----------
1 file changed, 59 insertions(+), 66 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 38e7e5cc0ca..94e48ee3d49 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -43,12 +43,10 @@ import
org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.query.QueryResult;
@@ -863,8 +861,8 @@ public class EosIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
eosConfig);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
@@ -901,10 +899,10 @@ public class EosIntegrationTest {
.addSource("source", MULTI_PARTITION_INPUT_TOPIC)
.addProcessor("processor", () -> new Processor<Integer, String,
Integer, String>() {
KeyValueStore<Integer, String> stateStore;
-
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String>
context;
+ ProcessorContext<Integer, String> context;
@Override
- public void init(final
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String>
context) {
+ public void init(final ProcessorContext<Integer, String>
context) {
Processor.super.init(context);
this.context = context;
stateStore = context.getStateStore(stateStoreName);
@@ -1014,7 +1012,7 @@ public class EosIntegrationTest {
final long topicEndOffset = consumer.position(tp);
assertTrue(topicEndOffset >= checkpointedOffset,
- "changelog topic end " + topicEndOffset + " is less than
checkpointed offset " + checkpointedOffset);
+ "changelog topic end " + topicEndOffset + " is less than
checkpointed offset " + checkpointedOffset);
consumer.seekToBeginning(partitions);
@@ -1070,80 +1068,75 @@ public class EosIntegrationTest {
}
final KStream<Long, Long> input =
builder.stream(MULTI_PARTITION_INPUT_TOPIC);
- input.transform(new TransformerSupplier<Long, Long, KeyValue<Long,
Long>>() {
- @SuppressWarnings("unchecked")
- @Override
- public Transformer<Long, Long, KeyValue<Long, Long>> get() {
- return new Transformer<Long, Long, KeyValue<Long, Long>>() {
- ProcessorContext context;
- KeyValueStore<Long, Long> state = null;
+ input.process(() -> new Processor<Long, Long, Long, Long>() {
+ ProcessorContext<Long, Long> context;
+ KeyValueStore<Long, Long> state = null;
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
+ @Override
+ public void init(final ProcessorContext<Long, Long> context) {
+ this.context = context;
- if (withState) {
- state = context.getStateStore(storeName);
- }
+ if (withState) {
+ state = context.getStateStore(storeName);
}
+ }
- @Override
- public KeyValue<Long, Long> transform(final Long key,
final Long value) {
- if (stallInjected.compareAndSet(true, false)) {
- LOG.info(dummyHostName + " is executing the
injected stall");
- stallingHost.set(dummyHostName);
- while (doStall) {
- final Thread thread = Thread.currentThread();
- if (thread.isInterrupted()) {
+ @Override
+ public void process(final Record<Long, Long> record) {
+ if (stallInjected.compareAndSet(true, false)) {
+ LOG.info(dummyHostName + " is executing the injected
stall");
+ stallingHost.set(dummyHostName);
+ while (doStall) {
+ final Thread thread = Thread.currentThread();
+ if (thread.isInterrupted()) {
+ throw new RuntimeException("Detected we've
been interrupted.");
+ }
+ if (!processingThreadsEnabled) {
+ if (!((StreamThread) thread).isRunning()) {
throw new RuntimeException("Detected we've
been interrupted.");
}
- if (!processingThreadsEnabled) {
- if (!((StreamThread) thread).isRunning()) {
- throw new RuntimeException("Detected
we've been interrupted.");
- }
- }
- try {
- Thread.sleep(100);
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
}
- }
-
- if ((value + 1) % 10 == 0) {
- context.commit();
- commitRequested.incrementAndGet();
- }
-
- if (state != null) {
- Long sum = state.get(key);
-
- if (sum == null) {
- sum = value;
- } else {
- sum += value;
+ try {
+ Thread.sleep(100);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
}
- state.put(key, sum);
- state.flush();
}
+ }
+ final long key = record.key();
+ final long value = record.value();
- if (errorInjected.compareAndSet(true, false)) {
- // only tries to fail once on one of the task
- throw new RuntimeException("Injected test
exception.");
- }
+ if ((value + 1) % 10 == 0) {
+ context.commit();
+ commitRequested.incrementAndGet();
+ }
+
+ if (state != null) {
+ Long sum = state.get(key);
- if (state != null) {
- return new KeyValue<>(key, state.get(key));
+ if (sum == null) {
+ sum = value;
} else {
- return new KeyValue<>(key, value);
+ sum += value;
}
+ state.put(key, sum);
+ state.flush();
}
- @Override
- public void close() { }
- };
- } }, storeNames)
+
+ if (errorInjected.compareAndSet(true, false)) {
+ // only tries to fail once on one of the task
+ throw new RuntimeException("Injected test exception.");
+ }
+
+ if (state != null) {
+ context.forward(record.withValue(state.get(key)));
+ } else {
+ context.forward(record);
+ }
+ }
+ }, storeNames)
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;