This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d6a41ac3ca3 MINOR: update EosV2UpgradeIntegrationTest (#16698)
d6a41ac3ca3 is described below
commit d6a41ac3ca3bbd733c3385861306cf04f2661a22
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:26:25 2024 -0700
MINOR: update EosV2UpgradeIntegrationTest (#16698)
Refactor test to move off deprecated `transform()` in favor of
`process()`.
Reviewers: Bill Bejeck <[email protected]>
---
.../integration/EosV2UpgradeIntegrationTest.java | 97 ++++++++++------------
1 file changed, 46 insertions(+), 51 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index 087a439d084..8653f69fca6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -40,9 +40,9 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -847,7 +847,6 @@ public class EosV2UpgradeIntegrationTest {
}
}
- @SuppressWarnings("deprecation")
private KafkaStreams getKafkaStreams(final String appDir,
final String processingGuarantee,
final boolean injectError) {
@@ -861,62 +860,58 @@ public class EosV2UpgradeIntegrationTest {
builder.addStateStore(storeBuilder);
final KStream<Long, Long> input =
builder.stream(MULTI_PARTITION_INPUT_TOPIC);
- input.transform(new TransformerSupplier<Long, Long, KeyValue<Long,
Long>>() {
- @Override
- public Transformer<Long, Long, KeyValue<Long, Long>> get() {
- return new Transformer<Long, Long, KeyValue<Long, Long>>() {
- ProcessorContext context;
- KeyValueStore<Long, Long> state = null;
- AtomicBoolean crash;
- AtomicInteger sharedCommit;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- state = context.getStateStore(storeName);
- final String clientId =
context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
- if (APP_DIR_1.equals(clientId)) {
- crash = errorInjectedClient1;
- sharedCommit = commitCounterClient1;
- } else {
- crash = errorInjectedClient2;
- sharedCommit = commitCounterClient2;
- }
+ input.process(() -> new Processor<Long, Long, Long, Long>() {
+ ProcessorContext<Long, Long> context;
+ KeyValueStore<Long, Long> state = null;
+ AtomicBoolean crash;
+ AtomicInteger sharedCommit;
+
+ @Override
+ public void init(final ProcessorContext<Long, Long> context) {
+ this.context = context;
+ state = context.getStateStore(storeName);
+ final String clientId =
context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
+ if (APP_DIR_1.equals(clientId)) {
+ crash = errorInjectedClient1;
+ sharedCommit = commitCounterClient1;
+ } else {
+ crash = errorInjectedClient2;
+ sharedCommit = commitCounterClient2;
}
+ }
- @Override
- public KeyValue<Long, Long> transform(final Long key,
final Long value) {
- if ((value + 1) % 10 == 0) {
- if (sharedCommit.get() < 0 ||
- sharedCommit.incrementAndGet() == 2) {
+ @Override
+ public void process(final Record<Long, Long> record) {
+ final long key = record.key();
+ final long value = record.value();
- context.commit();
- }
- commitRequested.incrementAndGet();
- }
+ if ((value + 1) % 10 == 0) {
+ if (sharedCommit.get() < 0 ||
+ sharedCommit.incrementAndGet() == 2) {
- Long sum = state.get(key);
- if (sum == null) {
- sum = value;
- } else {
- sum += value;
+ context.commit();
}
- state.put(key, sum);
- state.flush();
+ commitRequested.incrementAndGet();
+ }
- if (value % 10 == 4 && // potentially crash when
processing 5th, 15th, or 25th record (etc.)
- crash != null && crash.compareAndSet(true, false))
{
- // only crash a single task
- throw new RuntimeException("Injected test
exception.");
- }
+ Long sum = state.get(key);
+ if (sum == null) {
+ sum = value;
+ } else {
+ sum += value;
+ }
+ state.put(key, sum);
+ state.flush();
- return new KeyValue<>(key, state.get(key));
+ if (value % 10 == 4 && // potentially crash when
processing 5th, 15th, or 25th record (etc.)
+ crash != null && crash.compareAndSet(true, false)) {
+ // only crash a single task
+ throw new RuntimeException("Injected test exception.");
}
- @Override
- public void close() {}
- };
- } }, storeNames)
+ context.forward(record.withValue(state.get(key)));
+ }
+ }, storeNames)
.to(MULTI_PARTITION_OUTPUT_TOPIC);
final Properties properties = new Properties();