This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce5491b7bf8 KAFKA-16331: remove KafkaClientSupplier from 
StreamsProducer (#17259)
ce5491b7bf8 is described below

commit ce5491b7bf80b39f402d8914e19646663180ef40
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Sep 26 17:07:00 2024 -0700

    KAFKA-16331: remove KafkaClientSupplier from StreamsProducer (#17259)
    
    With EOSv1 removed, we don't need to create a producer per task, and thus 
can simplify the code by removing KafkaClientSupplier from the deeply nested 
StreamsProducer, to simplify the code.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../processor/internals/ActiveTaskCreator.java     |  96 ++++-----
 .../streams/processor/internals/ClientUtils.java   |   3 +-
 .../processor/internals/RecordCollectorImpl.java   |   6 +-
 .../streams/processor/internals/StreamThread.java  |   1 +
 .../processor/internals/StreamsProducer.java       |  55 +-----
 .../apache/kafka/streams/StreamsConfigTest.java    |   2 +-
 .../processor/internals/ActiveTaskCreatorTest.java |   1 +
 .../processor/internals/RecordCollectorTest.java   |  89 +++------
 .../processor/internals/StreamsProducerTest.java   | 214 ++++++---------------
 .../streams/state/KeyValueStoreTestDriver.java     |   9 +-
 .../StreamThreadStateStoreProviderTest.java        |   7 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  31 +--
 13 files changed, 141 insertions(+), 375 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 14d0630d2e9..a1c941996c9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
               files="KTableImpl.java"/>
 
     <suppress checks="ParameterNumber"
-              files="StreamThread.java"/>
+              files="(StreamThread|ActiveTaskCreator).java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 811c8c74543..1bbb4b921ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -37,7 +39,6 @@ import org.slf4j.Logger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,7 +47,7 @@ import java.util.UUID;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
-import static 
org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
+import static 
org.apache.kafka.streams.processor.internals.ClientUtils.producerClientId;
 
 class ActiveTaskCreator {
     private final TopologyMetadata topologyMetadata;
@@ -58,10 +59,13 @@ class ActiveTaskCreator {
     private final Time time;
     private final KafkaClientSupplier clientSupplier;
     private final String threadId;
+    private final int threadIdx;
+    private final UUID processId;
     private final Logger log;
     private final Sensor createTaskSensor;
     private final StreamsProducer threadProducer;
-    private final Map<TaskId, StreamsProducer> taskProducers;
+    // TODO remove `taskProducers`
+    private final Map<TaskId, StreamsProducer> taskProducers = 
Collections.emptyMap();
     private final ProcessingMode processingMode;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
@@ -75,11 +79,11 @@ class ActiveTaskCreator {
                       final Time time,
                       final KafkaClientSupplier clientSupplier,
                       final String threadId,
+                      final int threadIdx,
                       final UUID processId,
                       final Logger log,
                       final boolean stateUpdaterEnabled,
-                      final boolean processingThreadsEnabled
-                      ) {
+                      final boolean processingThreadsEnabled) {
         this.topologyMetadata = topologyMetadata;
         this.applicationConfig = applicationConfig;
         this.streamsMetrics = streamsMetrics;
@@ -89,6 +93,8 @@ class ActiveTaskCreator {
         this.time = time;
         this.clientSupplier = clientSupplier;
         this.threadId = threadId;
+        this.threadIdx = threadIdx;
+        this.processId = processId;
         this.log = log;
         this.stateUpdaterEnabled = stateUpdaterEnabled;
         this.processingThreadsEnabled = processingThreadsEnabled;
@@ -96,28 +102,30 @@ class ActiveTaskCreator {
         createTaskSensor = ThreadMetrics.createTaskSensor(threadId, 
streamsMetrics);
         processingMode = processingMode(applicationConfig);
 
-        if (processingMode == EXACTLY_ONCE_ALPHA) {
-            threadProducer = null;
-            taskProducers = new HashMap<>();
-        } else { // non-eos and eos-v2
-            log.info("Creating thread producer client");
+        final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
+        final LogContext logContext = new LogContext(threadIdPrefix);
 
-            final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
-            final LogContext logContext = new LogContext(threadIdPrefix);
+        threadProducer = new StreamsProducer(
+            processingMode,
+            producer(),
+            logContext,
+            time
+        );
+    }
 
-            threadProducer = new StreamsProducer(
-                applicationConfig,
-                threadId,
-                clientSupplier,
-                processId,
-                logContext,
-                time);
-            taskProducers = Collections.emptyMap();
+    private Producer<byte[], byte[]> producer() {
+        final Map<String, Object> producerConfig = 
applicationConfig.getProducerConfigs(producerClientId(threadId));
+        if (eosEnabled(processingMode)) {
+            producerConfig.put(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                
applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + 
processId + "-" + threadIdx
+            );
         }
+        return clientSupplier.getProducer(producerConfig);
     }
 
     public void reInitializeThreadProducer() {
-        threadProducer.resetProducer();
+        threadProducer.resetProducer(producer());
     }
 
     StreamsProducer streamsProducerForTask(final TaskId taskId) {
@@ -187,26 +195,10 @@ class ActiveTaskCreator {
     private RecordCollector createRecordCollector(final TaskId taskId,
                                                   final LogContext logContext,
                                                   final ProcessorTopology 
topology) {
-        final StreamsProducer streamsProducer;
-        if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
-            log.info("Creating producer client for task {}", taskId);
-            streamsProducer = new StreamsProducer(
-                applicationConfig,
-                threadId,
-                clientSupplier,
-                null,
-                logContext,
-                time
-            );
-            taskProducers.put(taskId, streamsProducer);
-        } else {
-            streamsProducer = threadProducer;
-        }
-
         return new RecordCollectorImpl(
             logContext,
             taskId,
-            streamsProducer,
+            this.threadProducer,
             applicationConfig.productionExceptionHandler(),
             streamsMetrics,
             topology
@@ -282,13 +274,12 @@ class ActiveTaskCreator {
         return task;
     }
 
+    // TODO: rename and revisit test
     void closeThreadProducerIfNeeded() {
-        if (threadProducer != null) {
-            try {
-                threadProducer.close();
-            } catch (final RuntimeException e) {
-                throw new StreamsException("Thread producer encounter error 
trying to close.", e);
-            }
+        try {
+            threadProducer.close();
+        } catch (final RuntimeException e) {
+            throw new StreamsException("Thread producer encounter error trying 
to close.", e);
         }
     }
 
@@ -304,17 +295,11 @@ class ActiveTaskCreator {
     }
 
     Map<MetricName, Metric> producerMetrics() {
-        // When EOS is turned on, each task will have its own producer client
-        // and the producer object passed in here will be null. We would then 
iterate through
-        // all the active tasks and add their metrics to the output metrics 
map.
-        final Collection<StreamsProducer> producers = threadProducer != null ?
-            Collections.singleton(threadProducer) :
-            taskProducers.values();
-        return ClientUtils.producerMetrics(producers);
+        return 
ClientUtils.producerMetrics(Collections.singleton(threadProducer));
     }
 
     String producerClientIds() {
-        return threadProducerClientId(threadId);
+        return producerClientId(threadId);
     }
 
     private LogContext getLogContext(final TaskId taskId) {
@@ -324,11 +309,6 @@ class ActiveTaskCreator {
     }
 
     public double totalProducerBlockedTime() {
-        if (threadProducer != null) {
-            return threadProducer.totalBlockedTime();
-        }
-        return taskProducers.values().stream()
-            .mapToDouble(StreamsProducer::totalBlockedTime)
-            .sum();
+        return threadProducer.totalBlockedTime();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 0fb9a17d2d2..87a25e65b62 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -64,7 +64,6 @@ public class ClientUtils {
         }
     }
 
-
     public static String adminClientId(final String clientId) {
         return clientId + "-admin";
     }
@@ -77,7 +76,7 @@ public class ClientUtils {
         return threadClientId + "-restore-consumer";
     }
 
-    public static String threadProducerClientId(final String threadClientId) {
+    public static String producerClientId(final String threadClientId) {
         return threadClientId + "-producer";
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index a79e4073b60..79074d20dcb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -75,7 +75,6 @@ public class RecordCollectorImpl implements RecordCollector {
     private final TaskId taskId;
     private final StreamsProducer streamsProducer;
     private final ProductionExceptionHandler productionExceptionHandler;
-    private final boolean eosEnabled;
     private final Map<TopicPartition, Long> offsets;
 
     private final StreamsMetricsImpl streamsMetrics;
@@ -98,7 +97,6 @@ public class RecordCollectorImpl implements RecordCollector {
         this.streamsProducer = streamsProducer;
         this.sendException = streamsProducer.sendException();
         this.productionExceptionHandler = productionExceptionHandler;
-        this.eosEnabled = streamsProducer.eosEnabled();
         this.streamsMetrics = streamsMetrics;
 
         final String threadId = Thread.currentThread().getName();
@@ -121,7 +119,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
 
     @Override
     public void initialize() {
-        if (eosEnabled) {
+        if (streamsProducer.eosEnabled()) {
             streamsProducer.initTransaction();
         }
     }
@@ -528,7 +526,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
     public void closeDirty() {
         log.info("Closing record collector dirty");
 
-        if (eosEnabled) {
+        if (streamsProducer.eosEnabled()) {
             // We may be closing dirty because the commit failed, so we must 
abort the transaction to be safe
             streamsProducer.abortTransaction();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 864f7e3ed26..5c58fce1e99 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -408,6 +408,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             time,
             clientSupplier,
             threadId,
+            threadIdx,
             processId,
             log,
             stateUpdaterEnabled,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 8ce954a7c35..98960dc2b6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -37,8 +37,6 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KafkaClientSupplier;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
@@ -49,13 +47,11 @@ import org.slf4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
-import static 
org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
 
 /**
  * {@code StreamsProducer} manages the producers within a Kafka Streams 
application.
@@ -69,8 +65,6 @@ public class StreamsProducer {
     private final Logger log;
     private final String logPrefix;
 
-    private final Map<String, Object> eosV2ProducerConfigs;
-    private final KafkaClientSupplier clientSupplier;
     private final ProcessingMode processingMode;
     private final Time time;
 
@@ -80,48 +74,15 @@ public class StreamsProducer {
     private double oldProducerTotalBlockedTime = 0;
     private final AtomicReference<KafkaException> sendException = new 
AtomicReference<>(null);
 
-    public StreamsProducer(final StreamsConfig config,
-                           final String threadId,
-                           final KafkaClientSupplier clientSupplier,
-                           final UUID processId,
+    public StreamsProducer(final ProcessingMode processingMode,
+                           final Producer<byte[], byte[]> producer,
                            final LogContext logContext,
                            final Time time) {
-        Objects.requireNonNull(config, "config cannot be null");
-        Objects.requireNonNull(threadId, "threadId cannot be null");
-        this.clientSupplier = Objects.requireNonNull(clientSupplier, 
"clientSupplier cannot be null");
+        this.processingMode = Objects.requireNonNull(processingMode, 
"processingMode cannot be null");
+        this.producer = Objects.requireNonNull(producer, "producer cannot be 
null");
         log = Objects.requireNonNull(logContext, "logContext cannot be 
null").logger(getClass());
         logPrefix = logContext.logPrefix().trim();
-        this.time = Objects.requireNonNull(time, "time");
-
-        processingMode = StreamsConfigUtils.processingMode(config);
-
-        final Map<String, Object> producerConfigs;
-        switch (processingMode) {
-            case AT_LEAST_ONCE: {
-                producerConfigs = 
config.getProducerConfigs(threadProducerClientId(threadId));
-                eosV2ProducerConfigs = null;
-
-                break;
-            }
-            case EXACTLY_ONCE_V2: {
-                producerConfigs = 
config.getProducerConfigs(threadProducerClientId(threadId));
-
-                final String applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-                producerConfigs.put(
-                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
-                    applicationId + "-" +
-                        Objects.requireNonNull(processId, "processId cannot be 
null for exactly-once v2") +
-                        "-" + threadId.split("-StreamThread-")[1]);
-
-                eosV2ProducerConfigs = producerConfigs;
-
-                break;
-            }
-            default:
-                throw new IllegalArgumentException("Unknown processing mode: " 
+ processingMode);
-        }
-
-        producer = clientSupplier.getProducer(producerConfigs);
+        this.time = Objects.requireNonNull(time, "time cannot be null");
     }
 
     private String formatException(final String message) {
@@ -171,18 +132,18 @@ public class StreamsProducer {
         }
     }
 
-    public void resetProducer() {
+    public void resetProducer(final Producer<byte[], byte[]> producer) {
         if (processingMode != EXACTLY_ONCE_V2) {
             throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
         }
 
-        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        oldProducerTotalBlockedTime += totalBlockedTime(this.producer);
         final long start = time.nanoseconds();
         close();
         final long closeTime = time.nanoseconds() - start;
         oldProducerTotalBlockedTime += closeTime;
 
-        producer = clientSupplier.getProducer(eosV2ProducerConfigs);
+        this.producer = producer;
     }
 
     private double getMetricValue(final Map<MetricName, ? extends Metric> 
metrics,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 9625c4b6d61..ffb1e449622 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -103,7 +103,7 @@ public class StreamsConfigTest {
 
     @BeforeEach
     public void setUp() {
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index c1f58b3972d..e2d5bf74687 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -310,6 +310,7 @@ public class ActiveTaskCreatorTest {
             new MockTime(),
             mockClientSupplier,
             "clientId-StreamThread-0",
+            0,
             uuid,
             new LogContext().logger(ActiveTaskCreator.class),
             false,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 353289fc72b..912a537b6fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
@@ -56,6 +55,7 @@ import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationE
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -78,7 +78,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -141,7 +140,6 @@ public class RecordCollectorTest {
 
     private final StringSerializer stringSerializer = new StringSerializer();
     private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
-    private final UUID processId = UUID.randomUUID();
 
     private final StreamPartitioner<String, Object> streamPartitioner =
         (topic, key, value, numPartitions) -> 
Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions));
@@ -157,15 +155,13 @@ public class RecordCollectorTest {
     public void setup() {
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setCluster(cluster);
+        mockProducer = (MockProducer<byte[], byte[]>) 
clientSupplier.getProducer(config.originals());
         streamsProducer = new StreamsProducer(
-            config,
-            processId + "-StreamThread-1",
-            clientSupplier,
-            processId,
+            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+            mockProducer,
             logContext,
             Time.SYSTEM
         );
-        mockProducer = clientSupplier.producers.get(0);
         final SinkNode<?, ?> sinkNode = new SinkNode<>(
             sinkNodeName,
             new StaticTopicNameExtractor<>(topic),
@@ -734,7 +730,6 @@ public class RecordCollectorTest {
     @Test
     public void shouldForwardFlushToStreamsProducer() {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
-        when(streamsProducer.eosEnabled()).thenReturn(false);
         doNothing().when(streamsProducer).flush();
         when(streamsProducer.sendException()).thenReturn(new 
AtomicReference<>(null));
         final ProcessorTopology topology = mock(ProcessorTopology.class);
@@ -755,7 +750,6 @@ public class RecordCollectorTest {
     @Test
     public void shouldForwardFlushToStreamsProducerEosEnabled() {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
-        when(streamsProducer.eosEnabled()).thenReturn(true);
         when(streamsProducer.sendException()).thenReturn(new 
AtomicReference<>(null));
         doNothing().when(streamsProducer).flush();
         final ProcessorTopology topology = mock(ProcessorTopology.class);
@@ -784,7 +778,6 @@ public class RecordCollectorTest {
 
     private void shouldClearOffsetsOnClose(final boolean clean) {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
-        when(streamsProducer.eosEnabled()).thenReturn(true);
         when(streamsProducer.sendException()).thenReturn(new 
AtomicReference<>(null));
         final long offset = 1234L;
         final RecordMetadata metadata = new RecordMetadata(
@@ -836,7 +829,6 @@ public class RecordCollectorTest {
     @Test
     public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
-        when(streamsProducer.eosEnabled()).thenReturn(true);
         when(streamsProducer.sendException()).thenReturn(new 
AtomicReference<>(null));
         final ProcessorTopology topology = mock(ProcessorTopology.class);
         
@@ -1449,20 +1441,13 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                eosConfig,
-                "-StreamThread-1",
-                new MockClientSupplier() {
+                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+                new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
                     @Override
-                    public Producer<byte[], byte[]> getProducer(final 
Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public void abortTransaction() {
-                                functionCalled.set(true);
-                            }
-                        };
+                    public void abortTransaction() {
+                        functionCalled.set(true);
                     }
                 },
-                processId,
                 logContext,
                 Time.SYSTEM
             ),
@@ -1481,20 +1466,13 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                config,
-                processId + "-StreamThread-1",
-                new MockClientSupplier() {
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
                     @Override
-                    public Producer<byte[], byte[]> getProducer(final 
Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public List<PartitionInfo> partitionsFor(final 
String topic) {
-                                return Collections.emptyList();
-                            }
-                        };
+                    public List<PartitionInfo> partitionsFor(final String 
topic) {
+                        return Collections.emptyList();
                     }
                 },
-                null,
                 logContext,
                 Time.SYSTEM
             ),
@@ -1521,15 +1499,8 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                eosConfig,
-                processId + "-StreamThread-1",
-                new MockClientSupplier() {
-                    @Override
-                    public Producer<byte[], byte[]> getProducer(final 
Map<String, Object> config) {
-                        return mockProducer;
-                    }
-                },
-                processId,
+                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+                mockProducer,
                 logContext,
                 Time.SYSTEM
             ),
@@ -1924,21 +1895,14 @@ public class RecordCollectorTest {
 
     private StreamsProducer getExceptionalStreamsProducerOnSend(final 
Exception exception) {
         return new StreamsProducer(
-            config,
-            processId + "-StreamThread-1",
-            new MockClientSupplier() {
+            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+            new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
                 @Override
-                public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                    return new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
-                        @Override
-                        public synchronized Future<RecordMetadata> send(final 
ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                            callback.onCompletion(null, exception);
-                            return null;
-                        }
-                    };
+                public synchronized Future<RecordMetadata> send(final 
ProducerRecord<byte[], byte[]> record, final Callback callback) {
+                    callback.onCompletion(null, exception);
+                    return null;
                 }
             },
-            null,
             logContext,
             Time.SYSTEM
         );
@@ -1946,20 +1910,13 @@ public class RecordCollectorTest {
 
     private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final 
RuntimeException exception) {
         return new StreamsProducer(
-            config,
-            processId + "-StreamThread-1",
-            new MockClientSupplier() {
+            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+            new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
                 @Override
-                public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                    return new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
-                        @Override
-                        public synchronized List<PartitionInfo> 
partitionsFor(final String topic) {
-                            throw exception;
-                        }
-                    };
+                public synchronized List<PartitionInfo> partitionsFor(final 
String topic) {
+                    throw exception;
                 }
             },
-            null,
             logContext,
             Time.SYSTEM
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index f914a5272c4..6f2d37b3b3a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -37,10 +37,10 @@ import 
org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.test.MockClientSupplier;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -63,8 +63,6 @@ import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -111,25 +109,15 @@ public class StreamsProducerTest {
 
     @SuppressWarnings("unchecked")
     final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
-    final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
-        @Override
-        public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
-            return mockedProducer;
-        }
-    };
     final StreamsProducer streamsProducerWithMock = new StreamsProducer(
-        nonEosConfig,
-        "threadId",
-        clientSupplier,
-        null,
+        StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+        mockedProducer,
         logContext,
         mockTime
     );
     final StreamsProducer eosStreamsProducerWithMock = new StreamsProducer(
-        eosConfig,
-        "threadId-StreamThread-0",
-        clientSupplier,
-        UUID.randomUUID(),
+        StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+        mockedProducer,
         logContext,
         mockTime
     );
@@ -152,30 +140,29 @@ public class StreamsProducerTest {
     @BeforeEach
     public void before() {
         mockClientSupplier.setCluster(cluster);
+        nonEosMockProducer = (MockProducer<byte[], byte[]>) 
mockClientSupplier.getProducer(nonEosConfig.originals());
         nonEosStreamsProducer =
             new StreamsProducer(
-                nonEosConfig,
-                "threadId-StreamThread-0",
-                mockClientSupplier,
-                null,
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                nonEosMockProducer,
                 logContext,
                 mockTime
             );
-        nonEosMockProducer = mockClientSupplier.producers.get(0);
 
         eosMockClientSupplier.setCluster(cluster);
         eosMockClientSupplier.setApplicationIdForProducer("appId");
+        final String clientId = "threadId-StreamThread-0";
+        final Map<String, Object> producerConfig = 
eosConfig.getProducerConfigs(clientId);
+        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + 
UUID.randomUUID() + "-0");
+        eosMockProducer = (MockProducer<byte[], byte[]>) 
eosMockClientSupplier.getProducer(producerConfig);
         eosStreamsProducer =
             new StreamsProducer(
-                eosConfig,
-                "threadId-StreamThread-0",
-                eosMockClientSupplier,
-                UUID.randomUUID(),
+                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+                eosMockProducer,
                 logContext,
                 mockTime
             );
         eosStreamsProducer.initTransaction();
-        eosMockProducer = eosMockClientSupplier.producers.get(0);
         when(mockTime.nanoseconds()).thenReturn(Time.SYSTEM.nanoseconds());
     }
 
@@ -202,12 +189,11 @@ public class StreamsProducerTest {
     @Test
     public void shouldResetTransactionInFlightOnReset() {
         // given:
-        eosStreamsProducer.send(
-            new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { 
});
+        eosStreamsProducer.send(new ProducerRecord<>("topic", new byte[1]), 
(metadata, error) -> { });
         assertThat(eosStreamsProducer.transactionInFlight(), is(true));
 
         // when:
-        eosStreamsProducer.resetProducer();
+        eosStreamsProducer.resetProducer(null);
 
         // then:
         assertThat(eosStreamsProducer.transactionInFlight(), is(false));
@@ -252,74 +238,66 @@ public class StreamsProducerTest {
     // error handling tests
 
     @Test
-    public void shouldFailIfStreamsConfigIsNull() {
+    public void shouldFailIfProcessingModeIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
                 null,
-                "threadId-StreamThread-0",
-                mockClientSupplier,
-                UUID.randomUUID(),
+                mockedProducer,
                 logContext,
                 mockTime)
         );
 
-        assertThat(thrown.getMessage(), is("config cannot be null"));
+        assertThat(thrown.getMessage(), is("processingMode cannot be null"));
     }
 
     @Test
-    public void shouldFailIfThreadIdIsNull() {
+    public void shouldFailIfProducerIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                nonEosConfig,
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 null,
-                mockClientSupplier,
-                UUID.randomUUID(),
                 logContext,
                 mockTime)
         );
 
-        assertThat(thrown.getMessage(), is("threadId cannot be null"));
+        assertThat(thrown.getMessage(), is("producer cannot be null"));
     }
 
     @Test
-    public void shouldFailIfClientSupplierIsNull() {
+    public void shouldFailIfLogContextIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                nonEosConfig,
-                "threadId-StreamThread-0",
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                mockedProducer,
                 null,
-                UUID.randomUUID(),
-                logContext,
                 mockTime)
         );
 
-        assertThat(thrown.getMessage(), is("clientSupplier cannot be null"));
+        assertThat(thrown.getMessage(), is("logContext cannot be null"));
     }
 
     @Test
-    public void shouldFailIfLogContextIsNull() {
+    public void shouldFailIfTimeIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                nonEosConfig,
-                "threadId-StreamThread-0",
-                mockClientSupplier,
-                UUID.randomUUID(),
-                null,
-                mockTime)
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                mockedProducer,
+                logContext,
+                null)
         );
 
-        assertThat(thrown.getMessage(), is("logContext cannot be null"));
+        assertThat(thrown.getMessage(), is("time cannot be null"));
     }
 
     @Test
     public void shouldFailOnResetProducerForAtLeastOnce() {
         final IllegalStateException thrown = assertThrows(
             IllegalStateException.class,
-            () -> nonEosStreamsProducer.resetProducer()
+            () -> nonEosStreamsProducer.resetProducer(null)
         );
 
         assertThat(thrown.getMessage(), is("Expected eos-v2 to be enabled, but 
the processing mode was AT_LEAST_ONCE"));
@@ -330,25 +308,6 @@ public class StreamsProducerTest {
 
     // functional tests
 
-    @Test
-    public void shouldNotSetTransactionIdIfEosDisabled() {
-        final Map<String, Object> producerConfig = new HashMap<>();
-        final StreamsConfig mockConfig = mock(StreamsConfig.class);
-        
when(mockConfig.getProducerConfigs("threadId-producer")).thenReturn(producerConfig);
-        
when(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).thenReturn(StreamsConfig.AT_LEAST_ONCE);
-
-        new StreamsProducer(
-            mockConfig,
-            "threadId",
-            mockClientSupplier,
-            null,
-            logContext,
-            mockTime
-        );
-
-        
assertFalse(producerConfig.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
-    }
-
     @Test
     public void shouldNotHaveEosEnabledIfEosDisabled() {
         assertThat(nonEosStreamsProducer.eosEnabled(), is(false));
@@ -439,27 +398,6 @@ public class StreamsProducerTest {
         assertThat(eosStreamsProducer.eosEnabled(), is(true));
     }
 
-    @Test
-    public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() {
-        final UUID processId = UUID.randomUUID();
-        final Map<String, Object> producerConfig = new HashMap<>();
-        final StreamsConfig mockConfig = mock(StreamsConfig.class);
-        
when(mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).thenReturn(producerConfig);
-        
when(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("appId");
-        
when(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).thenReturn(StreamsConfig.EXACTLY_ONCE_V2);
-
-        new StreamsProducer(
-            mockConfig,
-            "threadId-StreamThread-0",
-            eosMockClientSupplier,
-            processId,
-            logContext,
-            mockTime
-        );
-
-        assertEquals("appId-" + processId + "-0", 
producerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
-    }
-
     @Test
     public void shouldHaveEosEnabledIfEosEnabled() {
         assertThat(eosStreamsProducer.eosEnabled(), is(true));
@@ -531,10 +469,8 @@ public class StreamsProducerTest {
         when(mockedProducer.send(record, null)).thenReturn(null);
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            eosConfig,
-            "threadId-StreamThread-0",
-            clientSupplier,
-            UUID.randomUUID(),
+            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+            mockedProducer,
             logContext,
             mockTime
         );
@@ -576,38 +512,14 @@ public class StreamsProducerTest {
 
     // error handling tests
 
-    @Test
-    public void shouldFailIfProcessIdNullForEos() {
-        final NullPointerException thrown = assertThrows(
-            NullPointerException.class,
-            () -> new StreamsProducer(
-                eosConfig,
-                "threadId",
-                mockClientSupplier,
-                null,
-                logContext,
-                mockTime)
-        );
-
-        assertThat(thrown.getMessage(), is("processId cannot be null for 
exactly-once v2"));
-    }
-
     @Test
     public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() {
         // use `nonEosMockProducer` instead of `eosMockProducer` to avoid 
double Tx-Init
         nonEosMockProducer.initTransactionException = new 
TimeoutException("KABOOM!");
-        final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
-            @Override
-            public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                return nonEosMockProducer;
-            }
-        };
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            eosConfig,
-            "threadId-StreamThread-0",
-            clientSupplier,
-            UUID.randomUUID(),
+            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+            nonEosMockProducer,
             logContext,
             mockTime
         );
@@ -622,12 +534,11 @@ public class StreamsProducerTest {
 
     @Test
     public void 
shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForEos() {
+        // use `nonEosMockProducer` instead of `eosMockProducer` to avoid 
auto-init Tx
         final StreamsProducer streamsProducer =
             new StreamsProducer(
-                eosConfig,
-                "threadId-StreamThread-0",
-                eosMockClientSupplier,
-                UUID.randomUUID(),
+                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+                nonEosMockProducer,
                 logContext,
                 mockTime
             );
@@ -644,18 +555,10 @@ public class StreamsProducerTest {
     public void shouldThrowStreamsExceptionOnEosInitError() {
         // use `nonEosMockProducer` instead of `eosMockProducer` to avoid 
double Tx-Init
         nonEosMockProducer.initTransactionException = new 
KafkaException("KABOOM!");
-        final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
-            @Override
-            public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                return nonEosMockProducer;
-            }
-        };
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            eosConfig,
-            "threadId-StreamThread-0",
-            clientSupplier,
-            UUID.randomUUID(),
+            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+            nonEosMockProducer,
             logContext,
             mockTime
         );
@@ -673,18 +576,10 @@ public class StreamsProducerTest {
     public void shouldFailOnEosInitFatal() {
         // use `nonEosMockProducer` instead of `eosMockProducer` to avoid 
double Tx-Init
         nonEosMockProducer.initTransactionException = new 
RuntimeException("KABOOM!");
-        final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
-            @Override
-            public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                return nonEosMockProducer;
-            }
-        };
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            eosConfig,
-            "threadId-StreamThread-0",
-            clientSupplier,
-            UUID.randomUUID(),
+            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+            nonEosMockProducer,
             logContext,
             mockTime
         );
@@ -986,26 +881,25 @@ public class StreamsProducerTest {
 
     @Test
     public void shouldCloseExistingProducerOnResetProducer() {
-        eosStreamsProducer.resetProducer();
+        eosStreamsProducer.resetProducer(null);
 
         assertTrue(eosMockProducer.closed());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldSetNewProducerOnResetProducer() {
-        eosStreamsProducer.resetProducer();
+        final Producer<byte[], byte[]> newProducer = mock(Producer.class);
+        eosStreamsProducer.resetProducer(newProducer);
 
-        assertThat(eosMockClientSupplier.producers.size(), is(2));
-        assertThat(eosStreamsProducer.kafkaProducer(), 
is(eosMockClientSupplier.producers.get(1)));
+        assertThat(eosStreamsProducer.kafkaProducer(), is(newProducer));
     }
 
     @Test
     public void shouldResetTransactionInitializedOnResetProducer() {
         final StreamsProducer streamsProducer = new StreamsProducer(
-            eosConfig,
-            "threadId-StreamThread-0",
-            clientSupplier,
-            UUID.randomUUID(),
+            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
+            mockedProducer,
             logContext,
             mockTime
         );
@@ -1013,7 +907,7 @@ public class StreamsProducerTest {
 
         when(mockedProducer.metrics()).thenReturn(Collections.emptyMap());
 
-        streamsProducer.resetProducer();
+        streamsProducer.resetProducer(mockedProducer);
         streamsProducer.initTransaction();
 
         verify(mockedProducer).close();
@@ -1064,9 +958,9 @@ public class StreamsProducerTest {
         final long closeStart = 1L;
         final long clodeDelay = 1L;
         
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + 
clodeDelay);
-        eosStreamsProducer.resetProducer();
+        eosStreamsProducer.resetProducer(eosMockProducer);
         setProducerMetrics(
-            eosMockClientSupplier.producers.get(1),
+            eosMockProducer,
             BUFFER_POOL_WAIT_TIME,
             FLUSH_TME,
             TXN_INIT_TIME,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0fac1867c43..6c264701f80 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -26,6 +27,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -41,7 +43,6 @@ import 
org.apache.kafka.streams.processor.internals.StreamsProducer;
 import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockRocksDbConfigSetter;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
@@ -212,10 +213,8 @@ public class KeyValueStoreTestDriver<K, V> {
             logContext,
             new TaskId(0, 0),
             new StreamsProducer(
-                new StreamsConfig(props),
-                "threadId",
-                new MockClientSupplier(),
-                null,
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                new MockProducer<>(null, true, null, null),
                 logContext,
                 Time.SYSTEM),
             new DefaultProductionExceptionHandler(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c947bcb926b..cfebdcf220a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -81,7 +81,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
@@ -434,10 +433,8 @@ public class StreamThreadStateStoreProviderTest {
             logContext,
             taskId,
             new StreamsProducer(
-                streamsConfig,
-                "threadId",
-                clientSupplier,
-                UUID.randomUUID(),
+                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
+                clientSupplier.getProducer(streamsConfig.originals()),
                 logContext,
                 Time.SYSTEM
             ),
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 884fa2c7dda..6c797a0a280 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -109,7 +109,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Queue;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -347,28 +346,8 @@ public class TopologyTestDriver implements Closeable {
             }
         };
         testDriverProducer = new TestDriverProducer(
-            streamsConfig,
-            new KafkaClientSupplier() {
-                @Override
-                public Producer<byte[], byte[]> getProducer(final Map<String, 
Object> config) {
-                    return producer;
-                }
-
-                @Override
-                public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
-                    throw new IllegalStateException();
-                }
-
-                @Override
-                public Consumer<byte[], byte[]> getRestoreConsumer(final 
Map<String, Object> config) {
-                    throw new IllegalStateException();
-                }
-
-                @Override
-                public Consumer<byte[], byte[]> getGlobalConsumer(final 
Map<String, Object> config) {
-                    throw new IllegalStateException();
-                }
-            },
+            StreamsConfigUtils.processingMode(streamsConfig),
+            producer,
             logContext,
             mockWallClockTime
         );
@@ -1373,11 +1352,11 @@ public class TopologyTestDriver implements Closeable {
 
     private static class TestDriverProducer extends StreamsProducer {
 
-        public TestDriverProducer(final StreamsConfig config,
-                                  final KafkaClientSupplier clientSupplier,
+        public TestDriverProducer(final StreamsConfigUtils.ProcessingMode 
processingMode,
+                                  final Producer<byte[], byte[]> producer,
                                   final LogContext logContext,
                                   final Time time) {
-            super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, 
UUID.randomUUID(), logContext, time);
+            super(processingMode, producer, logContext, time);
         }
 
         @Override

Reply via email to