This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce5491b7bf8 KAFKA-16331: remove KafkaClientSupplier from
StreamsProducer (#17259)
ce5491b7bf8 is described below
commit ce5491b7bf80b39f402d8914e19646663180ef40
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Sep 26 17:07:00 2024 -0700
KAFKA-16331: remove KafkaClientSupplier from StreamsProducer (#17259)
With EOSv1 removed, we don't need to create a producer per task, and thus
can simplify the code by removing KafkaClientSupplier from the deeply nested
StreamsProducer, to simplify the code.
Reviewers: Bill Bejeck <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../processor/internals/ActiveTaskCreator.java | 96 ++++-----
.../streams/processor/internals/ClientUtils.java | 3 +-
.../processor/internals/RecordCollectorImpl.java | 6 +-
.../streams/processor/internals/StreamThread.java | 1 +
.../processor/internals/StreamsProducer.java | 55 +-----
.../apache/kafka/streams/StreamsConfigTest.java | 2 +-
.../processor/internals/ActiveTaskCreatorTest.java | 1 +
.../processor/internals/RecordCollectorTest.java | 89 +++------
.../processor/internals/StreamsProducerTest.java | 214 ++++++---------------
.../streams/state/KeyValueStoreTestDriver.java | 9 +-
.../StreamThreadStateStoreProviderTest.java | 7 +-
.../apache/kafka/streams/TopologyTestDriver.java | 31 +--
13 files changed, 141 insertions(+), 375 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 14d0630d2e9..a1c941996c9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
files="KTableImpl.java"/>
<suppress checks="ParameterNumber"
- files="StreamThread.java"/>
+ files="(StreamThread|ActiveTaskCreator).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 811c8c74543..1bbb4b921ba 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -37,7 +39,6 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,7 +47,7 @@ import java.util.UUID;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
-import static
org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
+import static
org.apache.kafka.streams.processor.internals.ClientUtils.producerClientId;
class ActiveTaskCreator {
private final TopologyMetadata topologyMetadata;
@@ -58,10 +59,13 @@ class ActiveTaskCreator {
private final Time time;
private final KafkaClientSupplier clientSupplier;
private final String threadId;
+ private final int threadIdx;
+ private final UUID processId;
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer threadProducer;
- private final Map<TaskId, StreamsProducer> taskProducers;
+ // TODO remove `taskProducers`
+ private final Map<TaskId, StreamsProducer> taskProducers =
Collections.emptyMap();
private final ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@@ -75,11 +79,11 @@ class ActiveTaskCreator {
final Time time,
final KafkaClientSupplier clientSupplier,
final String threadId,
+ final int threadIdx,
final UUID processId,
final Logger log,
final boolean stateUpdaterEnabled,
- final boolean processingThreadsEnabled
- ) {
+ final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
@@ -89,6 +93,8 @@ class ActiveTaskCreator {
this.time = time;
this.clientSupplier = clientSupplier;
this.threadId = threadId;
+ this.threadIdx = threadIdx;
+ this.processId = processId;
this.log = log;
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;
@@ -96,28 +102,30 @@ class ActiveTaskCreator {
createTaskSensor = ThreadMetrics.createTaskSensor(threadId,
streamsMetrics);
processingMode = processingMode(applicationConfig);
- if (processingMode == EXACTLY_ONCE_ALPHA) {
- threadProducer = null;
- taskProducers = new HashMap<>();
- } else { // non-eos and eos-v2
- log.info("Creating thread producer client");
+ final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
+ final LogContext logContext = new LogContext(threadIdPrefix);
- final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
- final LogContext logContext = new LogContext(threadIdPrefix);
+ threadProducer = new StreamsProducer(
+ processingMode,
+ producer(),
+ logContext,
+ time
+ );
+ }
- threadProducer = new StreamsProducer(
- applicationConfig,
- threadId,
- clientSupplier,
- processId,
- logContext,
- time);
- taskProducers = Collections.emptyMap();
+ private Producer<byte[], byte[]> producer() {
+ final Map<String, Object> producerConfig =
applicationConfig.getProducerConfigs(producerClientId(threadId));
+ if (eosEnabled(processingMode)) {
+ producerConfig.put(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+
applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" +
processId + "-" + threadIdx
+ );
}
+ return clientSupplier.getProducer(producerConfig);
}
public void reInitializeThreadProducer() {
- threadProducer.resetProducer();
+ threadProducer.resetProducer(producer());
}
StreamsProducer streamsProducerForTask(final TaskId taskId) {
@@ -187,26 +195,10 @@ class ActiveTaskCreator {
private RecordCollector createRecordCollector(final TaskId taskId,
final LogContext logContext,
final ProcessorTopology
topology) {
- final StreamsProducer streamsProducer;
- if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
- log.info("Creating producer client for task {}", taskId);
- streamsProducer = new StreamsProducer(
- applicationConfig,
- threadId,
- clientSupplier,
- null,
- logContext,
- time
- );
- taskProducers.put(taskId, streamsProducer);
- } else {
- streamsProducer = threadProducer;
- }
-
return new RecordCollectorImpl(
logContext,
taskId,
- streamsProducer,
+ this.threadProducer,
applicationConfig.productionExceptionHandler(),
streamsMetrics,
topology
@@ -282,13 +274,12 @@ class ActiveTaskCreator {
return task;
}
+ // TODO: rename and revisit test
void closeThreadProducerIfNeeded() {
- if (threadProducer != null) {
- try {
- threadProducer.close();
- } catch (final RuntimeException e) {
- throw new StreamsException("Thread producer encounter error
trying to close.", e);
- }
+ try {
+ threadProducer.close();
+ } catch (final RuntimeException e) {
+ throw new StreamsException("Thread producer encounter error trying
to close.", e);
}
}
@@ -304,17 +295,11 @@ class ActiveTaskCreator {
}
Map<MetricName, Metric> producerMetrics() {
- // When EOS is turned on, each task will have its own producer client
- // and the producer object passed in here will be null. We would then
iterate through
- // all the active tasks and add their metrics to the output metrics
map.
- final Collection<StreamsProducer> producers = threadProducer != null ?
- Collections.singleton(threadProducer) :
- taskProducers.values();
- return ClientUtils.producerMetrics(producers);
+ return
ClientUtils.producerMetrics(Collections.singleton(threadProducer));
}
String producerClientIds() {
- return threadProducerClientId(threadId);
+ return producerClientId(threadId);
}
private LogContext getLogContext(final TaskId taskId) {
@@ -324,11 +309,6 @@ class ActiveTaskCreator {
}
public double totalProducerBlockedTime() {
- if (threadProducer != null) {
- return threadProducer.totalBlockedTime();
- }
- return taskProducers.values().stream()
- .mapToDouble(StreamsProducer::totalBlockedTime)
- .sum();
+ return threadProducer.totalBlockedTime();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 0fb9a17d2d2..87a25e65b62 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -64,7 +64,6 @@ public class ClientUtils {
}
}
-
public static String adminClientId(final String clientId) {
return clientId + "-admin";
}
@@ -77,7 +76,7 @@ public class ClientUtils {
return threadClientId + "-restore-consumer";
}
- public static String threadProducerClientId(final String threadClientId) {
+ public static String producerClientId(final String threadClientId) {
return threadClientId + "-producer";
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index a79e4073b60..79074d20dcb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -75,7 +75,6 @@ public class RecordCollectorImpl implements RecordCollector {
private final TaskId taskId;
private final StreamsProducer streamsProducer;
private final ProductionExceptionHandler productionExceptionHandler;
- private final boolean eosEnabled;
private final Map<TopicPartition, Long> offsets;
private final StreamsMetricsImpl streamsMetrics;
@@ -98,7 +97,6 @@ public class RecordCollectorImpl implements RecordCollector {
this.streamsProducer = streamsProducer;
this.sendException = streamsProducer.sendException();
this.productionExceptionHandler = productionExceptionHandler;
- this.eosEnabled = streamsProducer.eosEnabled();
this.streamsMetrics = streamsMetrics;
final String threadId = Thread.currentThread().getName();
@@ -121,7 +119,7 @@ public class RecordCollectorImpl implements RecordCollector
{
@Override
public void initialize() {
- if (eosEnabled) {
+ if (streamsProducer.eosEnabled()) {
streamsProducer.initTransaction();
}
}
@@ -528,7 +526,7 @@ public class RecordCollectorImpl implements RecordCollector
{
public void closeDirty() {
log.info("Closing record collector dirty");
- if (eosEnabled) {
+ if (streamsProducer.eosEnabled()) {
// We may be closing dirty because the commit failed, so we must
abort the transaction to be safe
streamsProducer.abortTransaction();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 864f7e3ed26..5c58fce1e99 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -408,6 +408,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
time,
clientSupplier,
threadId,
+ threadIdx,
processId,
log,
stateUpdaterEnabled,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 8ce954a7c35..98960dc2b6e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -37,8 +37,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KafkaClientSupplier;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
@@ -49,13 +47,11 @@ import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
-import static
org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
/**
* {@code StreamsProducer} manages the producers within a Kafka Streams
application.
@@ -69,8 +65,6 @@ public class StreamsProducer {
private final Logger log;
private final String logPrefix;
- private final Map<String, Object> eosV2ProducerConfigs;
- private final KafkaClientSupplier clientSupplier;
private final ProcessingMode processingMode;
private final Time time;
@@ -80,48 +74,15 @@ public class StreamsProducer {
private double oldProducerTotalBlockedTime = 0;
private final AtomicReference<KafkaException> sendException = new
AtomicReference<>(null);
- public StreamsProducer(final StreamsConfig config,
- final String threadId,
- final KafkaClientSupplier clientSupplier,
- final UUID processId,
+ public StreamsProducer(final ProcessingMode processingMode,
+ final Producer<byte[], byte[]> producer,
final LogContext logContext,
final Time time) {
- Objects.requireNonNull(config, "config cannot be null");
- Objects.requireNonNull(threadId, "threadId cannot be null");
- this.clientSupplier = Objects.requireNonNull(clientSupplier,
"clientSupplier cannot be null");
+ this.processingMode = Objects.requireNonNull(processingMode,
"processingMode cannot be null");
+ this.producer = Objects.requireNonNull(producer, "producer cannot be
null");
log = Objects.requireNonNull(logContext, "logContext cannot be
null").logger(getClass());
logPrefix = logContext.logPrefix().trim();
- this.time = Objects.requireNonNull(time, "time");
-
- processingMode = StreamsConfigUtils.processingMode(config);
-
- final Map<String, Object> producerConfigs;
- switch (processingMode) {
- case AT_LEAST_ONCE: {
- producerConfigs =
config.getProducerConfigs(threadProducerClientId(threadId));
- eosV2ProducerConfigs = null;
-
- break;
- }
- case EXACTLY_ONCE_V2: {
- producerConfigs =
config.getProducerConfigs(threadProducerClientId(threadId));
-
- final String applicationId =
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- producerConfigs.put(
- ProducerConfig.TRANSACTIONAL_ID_CONFIG,
- applicationId + "-" +
- Objects.requireNonNull(processId, "processId cannot be
null for exactly-once v2") +
- "-" + threadId.split("-StreamThread-")[1]);
-
- eosV2ProducerConfigs = producerConfigs;
-
- break;
- }
- default:
- throw new IllegalArgumentException("Unknown processing mode: "
+ processingMode);
- }
-
- producer = clientSupplier.getProducer(producerConfigs);
+ this.time = Objects.requireNonNull(time, "time cannot be null");
}
private String formatException(final String message) {
@@ -171,18 +132,18 @@ public class StreamsProducer {
}
}
- public void resetProducer() {
+ public void resetProducer(final Producer<byte[], byte[]> producer) {
if (processingMode != EXACTLY_ONCE_V2) {
throw new IllegalStateException("Expected eos-v2 to be enabled,
but the processing mode was " + processingMode);
}
- oldProducerTotalBlockedTime += totalBlockedTime(producer);
+ oldProducerTotalBlockedTime += totalBlockedTime(this.producer);
final long start = time.nanoseconds();
close();
final long closeTime = time.nanoseconds() - start;
oldProducerTotalBlockedTime += closeTime;
- producer = clientSupplier.getProducer(eosV2ProducerConfigs);
+ this.producer = producer;
}
private double getMetricValue(final Map<MetricName, ? extends Metric>
metrics,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 9625c4b6d61..ffb1e449622 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -103,7 +103,7 @@ public class StreamsConfigTest {
@BeforeEach
public void setUp() {
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index c1f58b3972d..e2d5bf74687 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -310,6 +310,7 @@ public class ActiveTaskCreatorTest {
new MockTime(),
mockClientSupplier,
"clientId-StreamThread-0",
+ 0,
uuid,
new LogContext().logger(ActiveTaskCreator.class),
false,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 353289fc72b..912a537b6fb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
@@ -56,6 +55,7 @@ import
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationE
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -78,7 +78,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -141,7 +140,6 @@ public class RecordCollectorTest {
private final StringSerializer stringSerializer = new StringSerializer();
private final ByteArraySerializer byteArraySerializer = new
ByteArraySerializer();
- private final UUID processId = UUID.randomUUID();
private final StreamPartitioner<String, Object> streamPartitioner =
(topic, key, value, numPartitions) ->
Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions));
@@ -157,15 +155,13 @@ public class RecordCollectorTest {
public void setup() {
final MockClientSupplier clientSupplier = new MockClientSupplier();
clientSupplier.setCluster(cluster);
+ mockProducer = (MockProducer<byte[], byte[]>)
clientSupplier.getProducer(config.originals());
streamsProducer = new StreamsProducer(
- config,
- processId + "-StreamThread-1",
- clientSupplier,
- processId,
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ mockProducer,
logContext,
Time.SYSTEM
);
- mockProducer = clientSupplier.producers.get(0);
final SinkNode<?, ?> sinkNode = new SinkNode<>(
sinkNodeName,
new StaticTopicNameExtractor<>(topic),
@@ -734,7 +730,6 @@ public class RecordCollectorTest {
@Test
public void shouldForwardFlushToStreamsProducer() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
- when(streamsProducer.eosEnabled()).thenReturn(false);
doNothing().when(streamsProducer).flush();
when(streamsProducer.sendException()).thenReturn(new
AtomicReference<>(null));
final ProcessorTopology topology = mock(ProcessorTopology.class);
@@ -755,7 +750,6 @@ public class RecordCollectorTest {
@Test
public void shouldForwardFlushToStreamsProducerEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
- when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new
AtomicReference<>(null));
doNothing().when(streamsProducer).flush();
final ProcessorTopology topology = mock(ProcessorTopology.class);
@@ -784,7 +778,6 @@ public class RecordCollectorTest {
private void shouldClearOffsetsOnClose(final boolean clean) {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
- when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new
AtomicReference<>(null));
final long offset = 1234L;
final RecordMetadata metadata = new RecordMetadata(
@@ -836,7 +829,6 @@ public class RecordCollectorTest {
@Test
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
- when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new
AtomicReference<>(null));
final ProcessorTopology topology = mock(ProcessorTopology.class);
@@ -1449,20 +1441,13 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- eosConfig,
- "-StreamThread-1",
- new MockClientSupplier() {
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
@Override
- public Producer<byte[], byte[]> getProducer(final
Map<String, Object> config) {
- return new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
- @Override
- public void abortTransaction() {
- functionCalled.set(true);
- }
- };
+ public void abortTransaction() {
+ functionCalled.set(true);
}
},
- processId,
logContext,
Time.SYSTEM
),
@@ -1481,20 +1466,13 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- config,
- processId + "-StreamThread-1",
- new MockClientSupplier() {
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
@Override
- public Producer<byte[], byte[]> getProducer(final
Map<String, Object> config) {
- return new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
- @Override
- public List<PartitionInfo> partitionsFor(final
String topic) {
- return Collections.emptyList();
- }
- };
+ public List<PartitionInfo> partitionsFor(final String
topic) {
+ return Collections.emptyList();
}
},
- null,
logContext,
Time.SYSTEM
),
@@ -1521,15 +1499,8 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- eosConfig,
- processId + "-StreamThread-1",
- new MockClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final
Map<String, Object> config) {
- return mockProducer;
- }
- },
- processId,
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ mockProducer,
logContext,
Time.SYSTEM
),
@@ -1924,21 +1895,14 @@ public class RecordCollectorTest {
private StreamsProducer getExceptionalStreamsProducerOnSend(final
Exception exception) {
return new StreamsProducer(
- config,
- processId + "-StreamThread-1",
- new MockClientSupplier() {
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
@Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
- @Override
- public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
- callback.onCompletion(null, exception);
- return null;
- }
- };
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ callback.onCompletion(null, exception);
+ return null;
}
},
- null,
logContext,
Time.SYSTEM
);
@@ -1946,20 +1910,13 @@ public class RecordCollectorTest {
private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final
RuntimeException exception) {
return new StreamsProducer(
- config,
- processId + "-StreamThread-1",
- new MockClientSupplier() {
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
@Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
- @Override
- public synchronized List<PartitionInfo>
partitionsFor(final String topic) {
- throw exception;
- }
- };
+ public synchronized List<PartitionInfo> partitionsFor(final
String topic) {
+ throw exception;
}
},
- null,
logContext,
Time.SYSTEM
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index f914a5272c4..6f2d37b3b3a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -37,10 +37,10 @@ import
org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.test.MockClientSupplier;
import org.junit.jupiter.api.BeforeEach;
@@ -63,8 +63,6 @@ import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -111,25 +109,15 @@ public class StreamsProducerTest {
@SuppressWarnings("unchecked")
final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
- final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String, Object>
config) {
- return mockedProducer;
- }
- };
final StreamsProducer streamsProducerWithMock = new StreamsProducer(
- nonEosConfig,
- "threadId",
- clientSupplier,
- null,
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ mockedProducer,
logContext,
mockTime
);
final StreamsProducer eosStreamsProducerWithMock = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ mockedProducer,
logContext,
mockTime
);
@@ -152,30 +140,29 @@ public class StreamsProducerTest {
@BeforeEach
public void before() {
mockClientSupplier.setCluster(cluster);
+ nonEosMockProducer = (MockProducer<byte[], byte[]>)
mockClientSupplier.getProducer(nonEosConfig.originals());
nonEosStreamsProducer =
new StreamsProducer(
- nonEosConfig,
- "threadId-StreamThread-0",
- mockClientSupplier,
- null,
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ nonEosMockProducer,
logContext,
mockTime
);
- nonEosMockProducer = mockClientSupplier.producers.get(0);
eosMockClientSupplier.setCluster(cluster);
eosMockClientSupplier.setApplicationIdForProducer("appId");
+ final String clientId = "threadId-StreamThread-0";
+ final Map<String, Object> producerConfig =
eosConfig.getProducerConfigs(clientId);
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" +
UUID.randomUUID() + "-0");
+ eosMockProducer = (MockProducer<byte[], byte[]>)
eosMockClientSupplier.getProducer(producerConfig);
eosStreamsProducer =
new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- eosMockClientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ eosMockProducer,
logContext,
mockTime
);
eosStreamsProducer.initTransaction();
- eosMockProducer = eosMockClientSupplier.producers.get(0);
when(mockTime.nanoseconds()).thenReturn(Time.SYSTEM.nanoseconds());
}
@@ -202,12 +189,11 @@ public class StreamsProducerTest {
@Test
public void shouldResetTransactionInFlightOnReset() {
// given:
- eosStreamsProducer.send(
- new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> {
});
+ eosStreamsProducer.send(new ProducerRecord<>("topic", new byte[1]),
(metadata, error) -> { });
assertThat(eosStreamsProducer.transactionInFlight(), is(true));
// when:
- eosStreamsProducer.resetProducer();
+ eosStreamsProducer.resetProducer(null);
// then:
assertThat(eosStreamsProducer.transactionInFlight(), is(false));
@@ -252,74 +238,66 @@ public class StreamsProducerTest {
// error handling tests
@Test
- public void shouldFailIfStreamsConfigIsNull() {
+ public void shouldFailIfProcessingModeIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
null,
- "threadId-StreamThread-0",
- mockClientSupplier,
- UUID.randomUUID(),
+ mockedProducer,
logContext,
mockTime)
);
- assertThat(thrown.getMessage(), is("config cannot be null"));
+ assertThat(thrown.getMessage(), is("processingMode cannot be null"));
}
@Test
- public void shouldFailIfThreadIdIsNull() {
+ public void shouldFailIfProducerIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- nonEosConfig,
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
null,
- mockClientSupplier,
- UUID.randomUUID(),
logContext,
mockTime)
);
- assertThat(thrown.getMessage(), is("threadId cannot be null"));
+ assertThat(thrown.getMessage(), is("producer cannot be null"));
}
@Test
- public void shouldFailIfClientSupplierIsNull() {
+ public void shouldFailIfLogContextIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- nonEosConfig,
- "threadId-StreamThread-0",
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ mockedProducer,
null,
- UUID.randomUUID(),
- logContext,
mockTime)
);
- assertThat(thrown.getMessage(), is("clientSupplier cannot be null"));
+ assertThat(thrown.getMessage(), is("logContext cannot be null"));
}
@Test
- public void shouldFailIfLogContextIsNull() {
+ public void shouldFailIfTimeIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- nonEosConfig,
- "threadId-StreamThread-0",
- mockClientSupplier,
- UUID.randomUUID(),
- null,
- mockTime)
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ mockedProducer,
+ logContext,
+ null)
);
- assertThat(thrown.getMessage(), is("logContext cannot be null"));
+ assertThat(thrown.getMessage(), is("time cannot be null"));
}
@Test
public void shouldFailOnResetProducerForAtLeastOnce() {
final IllegalStateException thrown = assertThrows(
IllegalStateException.class,
- () -> nonEosStreamsProducer.resetProducer()
+ () -> nonEosStreamsProducer.resetProducer(null)
);
assertThat(thrown.getMessage(), is("Expected eos-v2 to be enabled, but
the processing mode was AT_LEAST_ONCE"));
@@ -330,25 +308,6 @@ public class StreamsProducerTest {
// functional tests
- @Test
- public void shouldNotSetTransactionIdIfEosDisabled() {
- final Map<String, Object> producerConfig = new HashMap<>();
- final StreamsConfig mockConfig = mock(StreamsConfig.class);
-
when(mockConfig.getProducerConfigs("threadId-producer")).thenReturn(producerConfig);
-
when(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).thenReturn(StreamsConfig.AT_LEAST_ONCE);
-
- new StreamsProducer(
- mockConfig,
- "threadId",
- mockClientSupplier,
- null,
- logContext,
- mockTime
- );
-
-
assertFalse(producerConfig.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
- }
-
@Test
public void shouldNotHaveEosEnabledIfEosDisabled() {
assertThat(nonEosStreamsProducer.eosEnabled(), is(false));
@@ -439,27 +398,6 @@ public class StreamsProducerTest {
assertThat(eosStreamsProducer.eosEnabled(), is(true));
}
- @Test
- public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() {
- final UUID processId = UUID.randomUUID();
- final Map<String, Object> producerConfig = new HashMap<>();
- final StreamsConfig mockConfig = mock(StreamsConfig.class);
-
when(mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).thenReturn(producerConfig);
-
when(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("appId");
-
when(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).thenReturn(StreamsConfig.EXACTLY_ONCE_V2);
-
- new StreamsProducer(
- mockConfig,
- "threadId-StreamThread-0",
- eosMockClientSupplier,
- processId,
- logContext,
- mockTime
- );
-
- assertEquals("appId-" + processId + "-0",
producerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
- }
-
@Test
public void shouldHaveEosEnabledIfEosEnabled() {
assertThat(eosStreamsProducer.eosEnabled(), is(true));
@@ -531,10 +469,8 @@ public class StreamsProducerTest {
when(mockedProducer.send(record, null)).thenReturn(null);
final StreamsProducer streamsProducer = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ mockedProducer,
logContext,
mockTime
);
@@ -576,38 +512,14 @@ public class StreamsProducerTest {
// error handling tests
- @Test
- public void shouldFailIfProcessIdNullForEos() {
- final NullPointerException thrown = assertThrows(
- NullPointerException.class,
- () -> new StreamsProducer(
- eosConfig,
- "threadId",
- mockClientSupplier,
- null,
- logContext,
- mockTime)
- );
-
- assertThat(thrown.getMessage(), is("processId cannot be null for
exactly-once v2"));
- }
-
@Test
public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() {
// use `nonEosMockProducer` instead of `eosMockProducer` to avoid
double Tx-Init
nonEosMockProducer.initTransactionException = new
TimeoutException("KABOOM!");
- final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return nonEosMockProducer;
- }
- };
final StreamsProducer streamsProducer = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ nonEosMockProducer,
logContext,
mockTime
);
@@ -622,12 +534,11 @@ public class StreamsProducerTest {
@Test
public void
shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForEos() {
+ // use `nonEosMockProducer` instead of `eosMockProducer` to avoid
auto-init Tx
final StreamsProducer streamsProducer =
new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- eosMockClientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ nonEosMockProducer,
logContext,
mockTime
);
@@ -644,18 +555,10 @@ public class StreamsProducerTest {
public void shouldThrowStreamsExceptionOnEosInitError() {
// use `nonEosMockProducer` instead of `eosMockProducer` to avoid
double Tx-Init
nonEosMockProducer.initTransactionException = new
KafkaException("KABOOM!");
- final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return nonEosMockProducer;
- }
- };
final StreamsProducer streamsProducer = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ nonEosMockProducer,
logContext,
mockTime
);
@@ -673,18 +576,10 @@ public class StreamsProducerTest {
public void shouldFailOnEosInitFatal() {
// use `nonEosMockProducer` instead of `eosMockProducer` to avoid
double Tx-Init
nonEosMockProducer.initTransactionException = new
RuntimeException("KABOOM!");
- final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return nonEosMockProducer;
- }
- };
final StreamsProducer streamsProducer = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ nonEosMockProducer,
logContext,
mockTime
);
@@ -986,26 +881,25 @@ public class StreamsProducerTest {
@Test
public void shouldCloseExistingProducerOnResetProducer() {
- eosStreamsProducer.resetProducer();
+ eosStreamsProducer.resetProducer(null);
assertTrue(eosMockProducer.closed());
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldSetNewProducerOnResetProducer() {
- eosStreamsProducer.resetProducer();
+ final Producer<byte[], byte[]> newProducer = mock(Producer.class);
+ eosStreamsProducer.resetProducer(newProducer);
- assertThat(eosMockClientSupplier.producers.size(), is(2));
- assertThat(eosStreamsProducer.kafkaProducer(),
is(eosMockClientSupplier.producers.get(1)));
+ assertThat(eosStreamsProducer.kafkaProducer(), is(newProducer));
}
@Test
public void shouldResetTransactionInitializedOnResetProducer() {
final StreamsProducer streamsProducer = new StreamsProducer(
- eosConfig,
- "threadId-StreamThread-0",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+ mockedProducer,
logContext,
mockTime
);
@@ -1013,7 +907,7 @@ public class StreamsProducerTest {
when(mockedProducer.metrics()).thenReturn(Collections.emptyMap());
- streamsProducer.resetProducer();
+ streamsProducer.resetProducer(mockedProducer);
streamsProducer.initTransaction();
verify(mockedProducer).close();
@@ -1064,9 +958,9 @@ public class StreamsProducerTest {
final long closeStart = 1L;
final long clodeDelay = 1L;
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart +
clodeDelay);
- eosStreamsProducer.resetProducer();
+ eosStreamsProducer.resetProducer(eosMockProducer);
setProducerMetrics(
- eosMockClientSupplier.producers.get(1),
+ eosMockProducer,
BUFFER_POOL_WAIT_TIME,
FLUSH_TME,
TXN_INIT_TIME,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0fac1867c43..6c264701f80 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
@@ -26,6 +27,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -41,7 +43,6 @@ import
org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
@@ -212,10 +213,8 @@ public class KeyValueStoreTestDriver<K, V> {
logContext,
new TaskId(0, 0),
new StreamsProducer(
- new StreamsConfig(props),
- "threadId",
- new MockClientSupplier(),
- null,
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ new MockProducer<>(null, true, null, null),
logContext,
Time.SYSTEM),
new DefaultProductionExceptionHandler(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c947bcb926b..cfebdcf220a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -81,7 +81,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
@@ -434,10 +433,8 @@ public class StreamThreadStateStoreProviderTest {
logContext,
taskId,
new StreamsProducer(
- streamsConfig,
- "threadId",
- clientSupplier,
- UUID.randomUUID(),
+ StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+ clientSupplier.getProducer(streamsConfig.originals()),
logContext,
Time.SYSTEM
),
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 884fa2c7dda..6c797a0a280 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -109,7 +109,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -347,28 +346,8 @@ public class TopologyTestDriver implements Closeable {
}
};
testDriverProducer = new TestDriverProducer(
- streamsConfig,
- new KafkaClientSupplier() {
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String,
Object> config) {
- return producer;
- }
-
- @Override
- public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
- throw new IllegalStateException();
- }
-
- @Override
- public Consumer<byte[], byte[]> getRestoreConsumer(final
Map<String, Object> config) {
- throw new IllegalStateException();
- }
-
- @Override
- public Consumer<byte[], byte[]> getGlobalConsumer(final
Map<String, Object> config) {
- throw new IllegalStateException();
- }
- },
+ StreamsConfigUtils.processingMode(streamsConfig),
+ producer,
logContext,
mockWallClockTime
);
@@ -1373,11 +1352,11 @@ public class TopologyTestDriver implements Closeable {
private static class TestDriverProducer extends StreamsProducer {
- public TestDriverProducer(final StreamsConfig config,
- final KafkaClientSupplier clientSupplier,
+ public TestDriverProducer(final StreamsConfigUtils.ProcessingMode
processingMode,
+ final Producer<byte[], byte[]> producer,
final LogContext logContext,
final Time time) {
- super(config, "TopologyTestDriver-StreamThread-1", clientSupplier,
UUID.randomUUID(), logContext, time);
+ super(processingMode, producer, logContext, time);
}
@Override