This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2b19baee04 KIP-16331: Remove EOS-v1 from StreamsConfigUtils (#17691)
f2b19baee04 is described below

commit f2b19baee04edb02ece7b2b7e4c6b50b1301ac9c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Nov 6 18:18:49 2024 -0800

    KIP-16331: Remove EOS-v1 from StreamsConfigUtils (#17691)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |   2 +-
 .../streams/internals/StreamsConfigUtils.java      |  32 ++----
 .../processor/internals/ActiveTaskCreator.java     |  11 +-
 .../streams/processor/internals/StreamThread.java  |   4 -
 .../processor/internals/StreamsProducer.java       |  19 ++--
 .../processor/internals/TopologyMetadata.java      |  14 +--
 .../processor/internals/RecordCollectorTest.java   |  47 ++++-----
 .../processor/internals/StreamsProducerTest.java   | 114 ++++++++++-----------
 .../internals/testutil/DummyStreamsConfig.java     |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java     |   9 +-
 .../StreamThreadStateStoreProviderTest.java        |   7 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  25 +++--
 12 files changed, 122 insertions(+), 166 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d24575a05d8..dafef7d7bc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1568,7 +1568,7 @@ public class StreamsConfig extends AbstractConfig {
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
 
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
-        if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
+        if (StreamsConfigUtils.eosEnabled(this)) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
         consumerProps.putAll(getClientCustomProps());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
index 4ac9792c228..8f1537ff6d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
@@ -31,9 +31,6 @@ public class StreamsConfigUtils {
     public enum ProcessingMode {
         AT_LEAST_ONCE(StreamsConfig.AT_LEAST_ONCE),
 
-        // TODO cleanup
-        EXACTLY_ONCE_ALPHA("exactly_once"),
-
         EXACTLY_ONCE_V2(StreamsConfig.EXACTLY_ONCE_V2);
 
         public final String name;
@@ -41,38 +38,23 @@ public class StreamsConfigUtils {
         ProcessingMode(final String name) {
             this.name = name;
         }
+
+        @Override
+        public String toString() {
+            return name;
+        }
     }
 
-    // TODO cleanup
     public static ProcessingMode processingMode(final StreamsConfig config) {
-        if 
("exactly_once".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
 {
-            return ProcessingMode.EXACTLY_ONCE_ALPHA;
-        } else if 
("exactly_once_beta".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
 {
-            return ProcessingMode.EXACTLY_ONCE_V2;
-        } else if 
(StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
 {
+        if 
(StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
 {
             return ProcessingMode.EXACTLY_ONCE_V2;
         } else {
             return ProcessingMode.AT_LEAST_ONCE;
         }
     }
 
-    // TODO cleanup
-    public static String processingModeString(final ProcessingMode 
processingMode) {
-        if (processingMode == ProcessingMode.EXACTLY_ONCE_V2) {
-            return StreamsConfig.EXACTLY_ONCE_V2;
-        } else if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
-            return "exactly_once";
-        } else {
-            return StreamsConfig.AT_LEAST_ONCE;
-        }
-    }
-
     public static boolean eosEnabled(final StreamsConfig config) {
-        return eosEnabled(processingMode(config));
-    }
-
-    public static boolean eosEnabled(final ProcessingMode processingMode) {
-        return processingMode == ProcessingMode.EXACTLY_ONCE_V2;
+        return processingMode(config) == ProcessingMode.EXACTLY_ONCE_V2;
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 16c864576dc..6c973e096fc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -63,7 +62,6 @@ class ActiveTaskCreator {
     private final Logger log;
     private final Sensor createTaskSensor;
     private final StreamsProducer streamsProducer;
-    private final ProcessingMode processingMode;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
 
@@ -97,22 +95,21 @@ class ActiveTaskCreator {
         this.processingThreadsEnabled = processingThreadsEnabled;
 
         createTaskSensor = ThreadMetrics.createTaskSensor(threadId, 
streamsMetrics);
-        processingMode = processingMode(applicationConfig);
 
         final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
         final LogContext logContext = new LogContext(threadIdPrefix);
 
         streamsProducer = new StreamsProducer(
-            processingMode,
             producer(),
-            logContext,
-            time
+            processingMode(applicationConfig),
+            time,
+            logContext
         );
     }
 
     private Producer<byte[], byte[]> producer() {
         final Map<String, Object> producerConfig = 
applicationConfig.getProducerConfigs(producerClientId(threadId));
-        if (eosEnabled(processingMode)) {
+        if (eosEnabled(applicationConfig)) {
             producerConfig.put(
                 ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                 
applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + 
processId + "-" + threadIdx
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e492d180175..88e599a92cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -46,7 +46,6 @@ import org.apache.kafka.streams.ThreadMetadata;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import 
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
 import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -81,7 +80,6 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
-import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId;
@@ -346,7 +344,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
     private final boolean eosEnabled;
-    private final StreamsConfigUtils.ProcessingMode processingMode;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
 
@@ -649,7 +646,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
         this.numIterations = 1;
         this.eosEnabled = eosEnabled(config);
-        this.processingMode = processingMode(config);
         this.stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
         this.processingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         this.logSummaryIntervalMs = 
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 7cd072a1828..b7735655763 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 
 import org.slf4j.Logger;
@@ -76,15 +75,15 @@ public class StreamsProducer {
     // which we share across all tasks, ie, all `RecordCollectorImpl`
     private final AtomicReference<KafkaException> sendException = new 
AtomicReference<>(null);
 
-    public StreamsProducer(final ProcessingMode processingMode,
-                           final Producer<byte[], byte[]> producer,
-                           final LogContext logContext,
-                           final Time time) {
-        this.processingMode = Objects.requireNonNull(processingMode, 
"processingMode cannot be null");
+    public StreamsProducer(final Producer<byte[], byte[]> producer,
+                           final ProcessingMode processingMode,
+                           final Time time,
+                           final LogContext logContext) {
         this.producer = Objects.requireNonNull(producer, "producer cannot be 
null");
+        this.processingMode = Objects.requireNonNull(processingMode, 
"processingMode cannot be null");
+        this.time = Objects.requireNonNull(time, "time cannot be null");
         log = Objects.requireNonNull(logContext, "logContext cannot be 
null").logger(getClass());
         logPrefix = logContext.logPrefix().trim();
-        this.time = Objects.requireNonNull(time, "time cannot be null");
     }
 
     private String formatException(final String message) {
@@ -92,7 +91,7 @@ public class StreamsProducer {
     }
 
     boolean eosEnabled() {
-        return StreamsConfigUtils.eosEnabled(processingMode);
+        return processingMode == EXACTLY_ONCE_V2;
     }
 
     boolean transactionInFlight() {
@@ -135,8 +134,8 @@ public class StreamsProducer {
     }
 
     public void resetProducer(final Producer<byte[], byte[]> producer) {
-        if (processingMode != EXACTLY_ONCE_V2) {
-            throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
+        if (!eosEnabled()) {
+            throw new IllegalStateException("Expected EOS to be enabled, but 
processing mode is " + processingMode);
         }
 
         oldProducerTotalBlockedTime += totalBlockedTime(this.producer);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 62dc6929122..b1f6af32c91 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -274,11 +274,7 @@ public class TopologyMetadata {
      * @return A boolean indicating if the topology is paused.
      */
     public boolean isPaused(final String topologyName) {
-        if (topologyName == null) {
-            return pausedTopologies.contains(UNNAMED_TOPOLOGY);
-        } else {
-            return pausedTopologies.contains(topologyName);
-        }
+        return 
pausedTopologies.contains(getTopologyNameOrElseUnnamed(topologyName));
     }
 
     /**
@@ -463,7 +459,7 @@ public class TopologyMetadata {
 
         applyToEachBuilder(b -> {
             final String patternString = b.sourceTopicPatternString();
-            if (patternString.length() > 0) {
+            if (!patternString.isEmpty()) {
                 patternBuilder.append(patternString).append("|");
             }
         });
@@ -629,11 +625,7 @@ public class TopologyMetadata {
      *         else returns {@code null} if {@code topologyName} is non-null 
but no such NamedTopology exists
      */
     public InternalTopologyBuilder lookupBuilderForNamedTopology(final String 
topologyName) {
-        if (topologyName == null) {
-            return builders.get(UNNAMED_TOPOLOGY);
-        } else {
-            return builders.get(topologyName);
-        }
+        return builders.get(getTopologyNameOrElseUnnamed(topologyName));
     }
 
     private boolean evaluateConditionIsTrueForAnyBuilders(final 
Function<InternalTopologyBuilder, Boolean> condition) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index bc2db14389b..f7ec5784890 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -54,7 +54,6 @@ import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationE
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -88,6 +87,8 @@ import static java.util.Collections.emptySet;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -145,10 +146,10 @@ public class RecordCollectorTest {
     @BeforeEach
     public void setup() {
         streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
             mockProducer,
-            logContext,
-            Time.SYSTEM
+            AT_LEAST_ONCE,
+            Time.SYSTEM,
+            logContext
         );
         final SinkNode<?, ?> sinkNode = new SinkNode<>(
             sinkNodeName,
@@ -1429,15 +1430,15 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
-                new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
+                new MockProducer<>(cluster, true, byteArraySerializer, 
byteArraySerializer) {
                     @Override
                     public void abortTransaction() {
                         functionCalled.set(true);
                     }
                 },
-                logContext,
-                Time.SYSTEM
+                EXACTLY_ONCE_V2,
+                Time.SYSTEM,
+                logContext
             ),
             productionExceptionHandler,
             streamsMetrics,
@@ -1454,15 +1455,15 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
-                new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
+                new MockProducer<>(cluster, true, byteArraySerializer, 
byteArraySerializer) {
                     @Override
                     public List<PartitionInfo> partitionsFor(final String 
topic) {
                         return Collections.emptyList();
                     }
                 },
-                logContext,
-                Time.SYSTEM
+                AT_LEAST_ONCE,
+                Time.SYSTEM,
+                logContext
             ),
             productionExceptionHandler,
             streamsMetrics,
@@ -1487,10 +1488,10 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
                 mockProducer,
-                logContext,
-                Time.SYSTEM
+                EXACTLY_ONCE_V2,
+                Time.SYSTEM,
+                logContext
             ),
             productionExceptionHandler,
             streamsMetrics,
@@ -1883,30 +1884,30 @@ public class RecordCollectorTest {
 
     private StreamsProducer getExceptionalStreamsProducerOnSend(final 
Exception exception) {
         return new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
-            new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
+            new MockProducer<>(cluster, true, byteArraySerializer, 
byteArraySerializer) {
                 @Override
                 public synchronized Future<RecordMetadata> send(final 
ProducerRecord<byte[], byte[]> record, final Callback callback) {
                     callback.onCompletion(null, exception);
                     return null;
                 }
             },
-            logContext,
-            Time.SYSTEM
+            AT_LEAST_ONCE,
+            Time.SYSTEM,
+            logContext
         );
     }
 
     private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final 
RuntimeException exception) {
         return new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
-            new MockProducer<byte[], byte[]>(cluster, true, 
byteArraySerializer, byteArraySerializer) {
+            new MockProducer<>(cluster, true, byteArraySerializer, 
byteArraySerializer) {
                 @Override
                 public synchronized List<PartitionInfo> partitionsFor(final 
String topic) {
                     throw exception;
                 }
             },
-            logContext,
-            Time.SYSTEM
+            AT_LEAST_ONCE,
+            Time.SYSTEM,
+            logContext
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index a08442ac26c..3fbf3b912da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -55,6 +54,8 @@ import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
@@ -96,16 +97,16 @@ public class StreamsProducerTest {
     @SuppressWarnings("unchecked")
     final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
     private final StreamsProducer streamsProducerWithMock = new 
StreamsProducer(
-        StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
         mockedProducer,
-        logContext,
-        mockTime
+        AT_LEAST_ONCE,
+        mockTime,
+        logContext
     );
     private final StreamsProducer eosStreamsProducerWithMock = new 
StreamsProducer(
-        StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
         mockedProducer,
-        logContext,
-        mockTime
+        EXACTLY_ONCE_V2,
+        mockTime,
+        logContext
     );
 
     private final MockProducer<byte[], byte[]> nonEosMockProducer
@@ -128,18 +129,18 @@ public class StreamsProducerTest {
     public void before() {
         nonEosStreamsProducer =
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 nonEosMockProducer,
-                logContext,
-                mockTime
+                AT_LEAST_ONCE,
+                mockTime,
+                logContext
             );
 
         eosStreamsProducer =
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
                 eosMockProducer,
-                logContext,
-                mockTime
+                EXACTLY_ONCE_V2,
+                mockTime,
+                logContext
             );
         eosStreamsProducer.initTransaction();
         when(mockTime.nanoseconds()).thenReturn(Time.SYSTEM.nanoseconds());
@@ -216,10 +217,11 @@ public class StreamsProducerTest {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                null,
                 mockedProducer,
-                logContext,
-                mockTime)
+                null,
+                mockTime,
+                logContext
+            )
         );
 
         assertThat(thrown.getMessage(), is("processingMode cannot be null"));
@@ -230,41 +232,44 @@ public class StreamsProducerTest {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 null,
-                logContext,
-                mockTime)
+                AT_LEAST_ONCE,
+                mockTime,
+                logContext
+            )
         );
 
         assertThat(thrown.getMessage(), is("producer cannot be null"));
     }
 
     @Test
-    public void shouldFailIfLogContextIsNull() {
+    public void shouldFailIfTimeIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 mockedProducer,
+                AT_LEAST_ONCE,
                 null,
-                mockTime)
+                logContext
+            )
         );
 
-        assertThat(thrown.getMessage(), is("logContext cannot be null"));
+        assertThat(thrown.getMessage(), is("time cannot be null"));
     }
 
     @Test
-    public void shouldFailIfTimeIsNull() {
+    public void shouldFailIfLogContextIsNull() {
         final NullPointerException thrown = assertThrows(
             NullPointerException.class,
             () -> new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 mockedProducer,
-                logContext,
-                null)
+                AT_LEAST_ONCE,
+                mockTime,
+                null
+            )
         );
 
-        assertThat(thrown.getMessage(), is("time cannot be null"));
+        assertThat(thrown.getMessage(), is("logContext cannot be null"));
     }
 
     @Test
@@ -274,7 +279,7 @@ public class StreamsProducerTest {
             () -> nonEosStreamsProducer.resetProducer(null)
         );
 
-        assertThat(thrown.getMessage(), is("Expected eos-v2 to be enabled, but 
the processing mode was AT_LEAST_ONCE"));
+        assertThat(thrown.getMessage(), is("Expected EOS to be enabled, but 
processing mode is at_least_once"));
     }
 
 
@@ -282,11 +287,6 @@ public class StreamsProducerTest {
 
     // functional tests
 
-    @Test
-    public void shouldNotHaveEosEnabledIfEosDisabled() {
-        assertThat(nonEosStreamsProducer.eosEnabled(), is(false));
-    }
-
     @Test
     public void shouldNotInitTxIfEosDisable() {
         assertThat(nonEosMockProducer.transactionInitialized(), is(false));
@@ -367,16 +367,6 @@ public class StreamsProducerTest {
 
     // functional tests
 
-    @Test
-    public void shouldEnableEosIfEosEnabled() {
-        assertThat(eosStreamsProducer.eosEnabled(), is(true));
-    }
-
-    @Test
-    public void shouldHaveEosEnabledIfEosEnabled() {
-        assertThat(eosStreamsProducer.eosEnabled(), is(true));
-    }
-
     @Test
     public void shouldInitTxOnEos() {
         assertThat(eosMockProducer.transactionInitialized(), is(true));
@@ -443,10 +433,10 @@ public class StreamsProducerTest {
         when(mockedProducer.send(record, null)).thenReturn(null);
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
             mockedProducer,
-            logContext,
-            mockTime
+            EXACTLY_ONCE_V2,
+            mockTime,
+            logContext
         );
         streamsProducer.initTransaction();
         // call `send()` to start a transaction
@@ -492,10 +482,10 @@ public class StreamsProducerTest {
         nonEosMockProducer.initTransactionException = new 
TimeoutException("KABOOM!");
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
             nonEosMockProducer,
-            logContext,
-            mockTime
+            EXACTLY_ONCE_V2,
+            mockTime,
+            logContext
         );
 
         final TimeoutException thrown = assertThrows(
@@ -511,10 +501,10 @@ public class StreamsProducerTest {
         // use `nonEosMockProducer` instead of `eosMockProducer` to avoid 
auto-init Tx
         final StreamsProducer streamsProducer =
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
                 nonEosMockProducer,
-                logContext,
-                mockTime
+                EXACTLY_ONCE_V2,
+                mockTime,
+                logContext
             );
 
         final IllegalStateException thrown = assertThrows(
@@ -531,10 +521,10 @@ public class StreamsProducerTest {
         nonEosMockProducer.initTransactionException = new 
KafkaException("KABOOM!");
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
             nonEosMockProducer,
-            logContext,
-            mockTime
+            EXACTLY_ONCE_V2,
+            mockTime,
+            logContext
         );
 
         final StreamsException thrown = assertThrows(
@@ -552,10 +542,10 @@ public class StreamsProducerTest {
         nonEosMockProducer.initTransactionException = new 
RuntimeException("KABOOM!");
 
         final StreamsProducer streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
             nonEosMockProducer,
-            logContext,
-            mockTime
+            EXACTLY_ONCE_V2,
+            mockTime,
+            logContext
         );
 
         final RuntimeException thrown = assertThrows(
@@ -872,10 +862,10 @@ public class StreamsProducerTest {
     @Test
     public void shouldResetTransactionInitializedOnResetProducer() {
         final StreamsProducer streamsProducer = new StreamsProducer(
-            StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
             mockedProducer,
-            logContext,
-            mockTime
+            EXACTLY_ONCE_V2,
+            mockTime,
+            logContext
         );
         streamsProducer.initTransaction();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
index 23443930284..4843f65f1b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
@@ -17,14 +17,12 @@
 package org.apache.kafka.streams.processor.internals.testutil;
 
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 
 import java.util.Properties;
 
 public class DummyStreamsConfig extends StreamsConfig {
 
-
     public DummyStreamsConfig() {
         super(dummyProps(ProcessingMode.AT_LEAST_ONCE));
     }
@@ -37,7 +35,7 @@ public class DummyStreamsConfig extends StreamsConfig {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-application");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfigUtils.processingModeString(processingMode));
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
processingMode.toString());
         return props;
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 6c264701f80..0de1e1e606a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -58,6 +57,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -213,10 +213,11 @@ public class KeyValueStoreTestDriver<K, V> {
             logContext,
             new TaskId(0, 0),
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 new MockProducer<>(null, true, null, null),
-                logContext,
-                Time.SYSTEM),
+                AT_LEAST_ONCE,
+                Time.SYSTEM,
+                logContext
+            ),
             new DefaultProductionExceptionHandler(),
             new MockStreamsMetrics(new Metrics()),
             topology
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 7603cd63fc8..d1cdf53c3e4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -88,6 +88,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -451,10 +452,10 @@ public class StreamThreadStateStoreProviderTest {
             logContext,
             taskId,
             new StreamsProducer(
-                StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
                 producer,
-                logContext,
-                Time.SYSTEM
+                AT_LEAST_ONCE,
+                Time.SYSTEM,
+                logContext
             ),
             streamsConfig.productionExceptionHandler(),
             new MockStreamsMetrics(metrics),
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 5f9f140d088..08648805792 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -115,7 +116,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
-import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -339,17 +339,17 @@ public class TopologyTestDriver implements Closeable {
 
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-        producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, 
bytesSerializer) {
+        producer = new MockProducer<>(true, bytesSerializer, bytesSerializer) {
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 return Collections.singletonList(new PartitionInfo(topic, 
PARTITION_ID, null, null, null));
             }
         };
         testDriverProducer = new TestDriverProducer(
-            StreamsConfigUtils.processingMode(streamsConfig),
             producer,
-            logContext,
-            mockWallClockTime
+            StreamsConfigUtils.processingMode(streamsConfig),
+            mockWallClockTime,
+            logContext
         );
 
         setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, 
cache);
@@ -1138,9 +1138,8 @@ public class TopologyTestDriver implements Closeable {
                          " {} configuration during 
TopologyTestDriver#close().",
                      StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
         }
-        if (processingMode == AT_LEAST_ONCE) {
-            producer.close();
-        }
+        producer.close();
+        consumer.close();
         stateDirectory.clean();
     }
 
@@ -1354,11 +1353,11 @@ public class TopologyTestDriver implements Closeable {
 
     private static class TestDriverProducer extends StreamsProducer {
 
-        public TestDriverProducer(final StreamsConfigUtils.ProcessingMode 
processingMode,
-                                  final Producer<byte[], byte[]> producer,
-                                  final LogContext logContext,
-                                  final Time time) {
-            super(processingMode, producer, logContext, time);
+        public TestDriverProducer(final Producer<byte[], byte[]> producer,
+                                  final ProcessingMode processingMode,
+                                  final Time time,
+                                  final LogContext logContext) {
+            super(producer, processingMode, time, logContext);
         }
 
         @Override


Reply via email to