This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f2b19baee04 KIP-16331: Remove EOS-v1 from StreamsConfigUtils (#17691)
f2b19baee04 is described below
commit f2b19baee04edb02ece7b2b7e4c6b50b1301ac9c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Nov 6 18:18:49 2024 -0800
KIP-16331: Remove EOS-v1 from StreamsConfigUtils (#17691)
Reviewers: Bill Bejeck <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../streams/internals/StreamsConfigUtils.java | 32 ++----
.../processor/internals/ActiveTaskCreator.java | 11 +-
.../streams/processor/internals/StreamThread.java | 4 -
.../processor/internals/StreamsProducer.java | 19 ++--
.../processor/internals/TopologyMetadata.java | 14 +--
.../processor/internals/RecordCollectorTest.java | 47 ++++-----
.../processor/internals/StreamsProducerTest.java | 114 ++++++++++-----------
.../internals/testutil/DummyStreamsConfig.java | 4 +-
.../streams/state/KeyValueStoreTestDriver.java | 9 +-
.../StreamThreadStateStoreProviderTest.java | 7 +-
.../apache/kafka/streams/TopologyTestDriver.java | 25 +++--
12 files changed, 122 insertions(+), 166 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d24575a05d8..dafef7d7bc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1568,7 +1568,7 @@ public class StreamsConfig extends AbstractConfig {
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ?
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
- if (StreamsConfigUtils.processingMode(this) ==
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
+ if (StreamsConfigUtils.eosEnabled(this)) {
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
}
consumerProps.putAll(getClientCustomProps());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
index 4ac9792c228..8f1537ff6d3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
@@ -31,9 +31,6 @@ public class StreamsConfigUtils {
public enum ProcessingMode {
AT_LEAST_ONCE(StreamsConfig.AT_LEAST_ONCE),
- // TODO cleanup
- EXACTLY_ONCE_ALPHA("exactly_once"),
-
EXACTLY_ONCE_V2(StreamsConfig.EXACTLY_ONCE_V2);
public final String name;
@@ -41,38 +38,23 @@ public class StreamsConfigUtils {
ProcessingMode(final String name) {
this.name = name;
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
- // TODO cleanup
public static ProcessingMode processingMode(final StreamsConfig config) {
- if
("exactly_once".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
- return ProcessingMode.EXACTLY_ONCE_ALPHA;
- } else if
("exactly_once_beta".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
- return ProcessingMode.EXACTLY_ONCE_V2;
- } else if
(StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
+ if
(StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
{
return ProcessingMode.EXACTLY_ONCE_V2;
} else {
return ProcessingMode.AT_LEAST_ONCE;
}
}
- // TODO cleanup
- public static String processingModeString(final ProcessingMode
processingMode) {
- if (processingMode == ProcessingMode.EXACTLY_ONCE_V2) {
- return StreamsConfig.EXACTLY_ONCE_V2;
- } else if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
- return "exactly_once";
- } else {
- return StreamsConfig.AT_LEAST_ONCE;
- }
- }
-
public static boolean eosEnabled(final StreamsConfig config) {
- return eosEnabled(processingMode(config));
- }
-
- public static boolean eosEnabled(final ProcessingMode processingMode) {
- return processingMode == ProcessingMode.EXACTLY_ONCE_V2;
+ return processingMode(config) == ProcessingMode.EXACTLY_ONCE_V2;
}
@SuppressWarnings("deprecation")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 16c864576dc..6c973e096fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -63,7 +62,6 @@ class ActiveTaskCreator {
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer streamsProducer;
- private final ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@@ -97,22 +95,21 @@ class ActiveTaskCreator {
this.processingThreadsEnabled = processingThreadsEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId,
streamsMetrics);
- processingMode = processingMode(applicationConfig);
final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);
streamsProducer = new StreamsProducer(
- processingMode,
producer(),
- logContext,
- time
+ processingMode(applicationConfig),
+ time,
+ logContext
);
}
private Producer<byte[], byte[]> producer() {
final Map<String, Object> producerConfig =
applicationConfig.getProducerConfigs(producerClientId(threadId));
- if (eosEnabled(processingMode)) {
+ if (eosEnabled(applicationConfig)) {
producerConfig.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" +
processId + "-" + threadIdx
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e492d180175..88e599a92cd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -46,7 +46,6 @@ import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -81,7 +80,6 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
-import static
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId;
@@ -346,7 +344,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final boolean eosEnabled;
- private final StreamsConfigUtils.ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@@ -649,7 +646,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
- this.processingMode = processingMode(config);
this.stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
this.processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
this.logSummaryIntervalMs =
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 7cd072a1828..b7735655763 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.slf4j.Logger;
@@ -76,15 +75,15 @@ public class StreamsProducer {
// which we share across all tasks, ie, all `RecordCollectorImpl`
private final AtomicReference<KafkaException> sendException = new
AtomicReference<>(null);
- public StreamsProducer(final ProcessingMode processingMode,
- final Producer<byte[], byte[]> producer,
- final LogContext logContext,
- final Time time) {
- this.processingMode = Objects.requireNonNull(processingMode,
"processingMode cannot be null");
+ public StreamsProducer(final Producer<byte[], byte[]> producer,
+ final ProcessingMode processingMode,
+ final Time time,
+ final LogContext logContext) {
this.producer = Objects.requireNonNull(producer, "producer cannot be
null");
+ this.processingMode = Objects.requireNonNull(processingMode,
"processingMode cannot be null");
+ this.time = Objects.requireNonNull(time, "time cannot be null");
log = Objects.requireNonNull(logContext, "logContext cannot be
null").logger(getClass());
logPrefix = logContext.logPrefix().trim();
- this.time = Objects.requireNonNull(time, "time cannot be null");
}
private String formatException(final String message) {
@@ -92,7 +91,7 @@ public class StreamsProducer {
}
boolean eosEnabled() {
- return StreamsConfigUtils.eosEnabled(processingMode);
+ return processingMode == EXACTLY_ONCE_V2;
}
boolean transactionInFlight() {
@@ -135,8 +134,8 @@ public class StreamsProducer {
}
public void resetProducer(final Producer<byte[], byte[]> producer) {
- if (processingMode != EXACTLY_ONCE_V2) {
- throw new IllegalStateException("Expected eos-v2 to be enabled,
but the processing mode was " + processingMode);
+ if (!eosEnabled()) {
+ throw new IllegalStateException("Expected EOS to be enabled, but
processing mode is " + processingMode);
}
oldProducerTotalBlockedTime += totalBlockedTime(this.producer);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 62dc6929122..b1f6af32c91 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -274,11 +274,7 @@ public class TopologyMetadata {
* @return A boolean indicating if the topology is paused.
*/
public boolean isPaused(final String topologyName) {
- if (topologyName == null) {
- return pausedTopologies.contains(UNNAMED_TOPOLOGY);
- } else {
- return pausedTopologies.contains(topologyName);
- }
+ return
pausedTopologies.contains(getTopologyNameOrElseUnnamed(topologyName));
}
/**
@@ -463,7 +459,7 @@ public class TopologyMetadata {
applyToEachBuilder(b -> {
final String patternString = b.sourceTopicPatternString();
- if (patternString.length() > 0) {
+ if (!patternString.isEmpty()) {
patternBuilder.append(patternString).append("|");
}
});
@@ -629,11 +625,7 @@ public class TopologyMetadata {
* else returns {@code null} if {@code topologyName} is non-null
but no such NamedTopology exists
*/
public InternalTopologyBuilder lookupBuilderForNamedTopology(final String
topologyName) {
- if (topologyName == null) {
- return builders.get(UNNAMED_TOPOLOGY);
- } else {
- return builders.get(topologyName);
- }
+ return builders.get(getTopologyNameOrElseUnnamed(topologyName));
}
private boolean evaluateConditionIsTrueForAnyBuilders(final
Function<InternalTopologyBuilder, Boolean> condition) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index bc2db14389b..f7ec5784890 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -54,7 +54,6 @@ import
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationE
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -88,6 +87,8 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -145,10 +146,10 @@ public class RecordCollectorTest {
@BeforeEach
public void setup() {
streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
mockProducer,
- logContext,
- Time.SYSTEM
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
);
final SinkNode<?, ?> sinkNode = new SinkNode<>(
sinkNodeName,
@@ -1429,15 +1430,15 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
- new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
+ new MockProducer<>(cluster, true, byteArraySerializer,
byteArraySerializer) {
@Override
public void abortTransaction() {
functionCalled.set(true);
}
},
- logContext,
- Time.SYSTEM
+ EXACTLY_ONCE_V2,
+ Time.SYSTEM,
+ logContext
),
productionExceptionHandler,
streamsMetrics,
@@ -1454,15 +1455,15 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
- new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
+ new MockProducer<>(cluster, true, byteArraySerializer,
byteArraySerializer) {
@Override
public List<PartitionInfo> partitionsFor(final String
topic) {
return Collections.emptyList();
}
},
- logContext,
- Time.SYSTEM
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
),
productionExceptionHandler,
streamsMetrics,
@@ -1487,10 +1488,10 @@ public class RecordCollectorTest {
logContext,
taskId,
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
mockProducer,
- logContext,
- Time.SYSTEM
+ EXACTLY_ONCE_V2,
+ Time.SYSTEM,
+ logContext
),
productionExceptionHandler,
streamsMetrics,
@@ -1883,30 +1884,30 @@ public class RecordCollectorTest {
private StreamsProducer getExceptionalStreamsProducerOnSend(final
Exception exception) {
return new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
- new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
+ new MockProducer<>(cluster, true, byteArraySerializer,
byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, exception);
return null;
}
},
- logContext,
- Time.SYSTEM
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
);
}
private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final
RuntimeException exception) {
return new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
- new MockProducer<byte[], byte[]>(cluster, true,
byteArraySerializer, byteArraySerializer) {
+ new MockProducer<>(cluster, true, byteArraySerializer,
byteArraySerializer) {
@Override
public synchronized List<PartitionInfo> partitionsFor(final
String topic) {
throw exception;
}
},
- logContext,
- Time.SYSTEM
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index a08442ac26c..3fbf3b912da 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -55,6 +54,8 @@ import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
@@ -96,16 +97,16 @@ public class StreamsProducerTest {
@SuppressWarnings("unchecked")
final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
private final StreamsProducer streamsProducerWithMock = new
StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
mockedProducer,
- logContext,
- mockTime
+ AT_LEAST_ONCE,
+ mockTime,
+ logContext
);
private final StreamsProducer eosStreamsProducerWithMock = new
StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
mockedProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
private final MockProducer<byte[], byte[]> nonEosMockProducer
@@ -128,18 +129,18 @@ public class StreamsProducerTest {
public void before() {
nonEosStreamsProducer =
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
nonEosMockProducer,
- logContext,
- mockTime
+ AT_LEAST_ONCE,
+ mockTime,
+ logContext
);
eosStreamsProducer =
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
eosMockProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
eosStreamsProducer.initTransaction();
when(mockTime.nanoseconds()).thenReturn(Time.SYSTEM.nanoseconds());
@@ -216,10 +217,11 @@ public class StreamsProducerTest {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- null,
mockedProducer,
- logContext,
- mockTime)
+ null,
+ mockTime,
+ logContext
+ )
);
assertThat(thrown.getMessage(), is("processingMode cannot be null"));
@@ -230,41 +232,44 @@ public class StreamsProducerTest {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
null,
- logContext,
- mockTime)
+ AT_LEAST_ONCE,
+ mockTime,
+ logContext
+ )
);
assertThat(thrown.getMessage(), is("producer cannot be null"));
}
@Test
- public void shouldFailIfLogContextIsNull() {
+ public void shouldFailIfTimeIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
mockedProducer,
+ AT_LEAST_ONCE,
null,
- mockTime)
+ logContext
+ )
);
- assertThat(thrown.getMessage(), is("logContext cannot be null"));
+ assertThat(thrown.getMessage(), is("time cannot be null"));
}
@Test
- public void shouldFailIfTimeIsNull() {
+ public void shouldFailIfLogContextIsNull() {
final NullPointerException thrown = assertThrows(
NullPointerException.class,
() -> new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
mockedProducer,
- logContext,
- null)
+ AT_LEAST_ONCE,
+ mockTime,
+ null
+ )
);
- assertThat(thrown.getMessage(), is("time cannot be null"));
+ assertThat(thrown.getMessage(), is("logContext cannot be null"));
}
@Test
@@ -274,7 +279,7 @@ public class StreamsProducerTest {
() -> nonEosStreamsProducer.resetProducer(null)
);
- assertThat(thrown.getMessage(), is("Expected eos-v2 to be enabled, but
the processing mode was AT_LEAST_ONCE"));
+ assertThat(thrown.getMessage(), is("Expected EOS to be enabled, but
processing mode is at_least_once"));
}
@@ -282,11 +287,6 @@ public class StreamsProducerTest {
// functional tests
- @Test
- public void shouldNotHaveEosEnabledIfEosDisabled() {
- assertThat(nonEosStreamsProducer.eosEnabled(), is(false));
- }
-
@Test
public void shouldNotInitTxIfEosDisable() {
assertThat(nonEosMockProducer.transactionInitialized(), is(false));
@@ -367,16 +367,6 @@ public class StreamsProducerTest {
// functional tests
- @Test
- public void shouldEnableEosIfEosEnabled() {
- assertThat(eosStreamsProducer.eosEnabled(), is(true));
- }
-
- @Test
- public void shouldHaveEosEnabledIfEosEnabled() {
- assertThat(eosStreamsProducer.eosEnabled(), is(true));
- }
-
@Test
public void shouldInitTxOnEos() {
assertThat(eosMockProducer.transactionInitialized(), is(true));
@@ -443,10 +433,10 @@ public class StreamsProducerTest {
when(mockedProducer.send(record, null)).thenReturn(null);
final StreamsProducer streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
mockedProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
streamsProducer.initTransaction();
// call `send()` to start a transaction
@@ -492,10 +482,10 @@ public class StreamsProducerTest {
nonEosMockProducer.initTransactionException = new
TimeoutException("KABOOM!");
final StreamsProducer streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
nonEosMockProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
final TimeoutException thrown = assertThrows(
@@ -511,10 +501,10 @@ public class StreamsProducerTest {
// use `nonEosMockProducer` instead of `eosMockProducer` to avoid
auto-init Tx
final StreamsProducer streamsProducer =
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
nonEosMockProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
final IllegalStateException thrown = assertThrows(
@@ -531,10 +521,10 @@ public class StreamsProducerTest {
nonEosMockProducer.initTransactionException = new
KafkaException("KABOOM!");
final StreamsProducer streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
nonEosMockProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
final StreamsException thrown = assertThrows(
@@ -552,10 +542,10 @@ public class StreamsProducerTest {
nonEosMockProducer.initTransactionException = new
RuntimeException("KABOOM!");
final StreamsProducer streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
nonEosMockProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
final RuntimeException thrown = assertThrows(
@@ -872,10 +862,10 @@ public class StreamsProducerTest {
@Test
public void shouldResetTransactionInitializedOnResetProducer() {
final StreamsProducer streamsProducer = new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2,
mockedProducer,
- logContext,
- mockTime
+ EXACTLY_ONCE_V2,
+ mockTime,
+ logContext
);
streamsProducer.initTransaction();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
index 23443930284..4843f65f1b5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/DummyStreamsConfig.java
@@ -17,14 +17,12 @@
package org.apache.kafka.streams.processor.internals.testutil;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import java.util.Properties;
public class DummyStreamsConfig extends StreamsConfig {
-
public DummyStreamsConfig() {
super(dummyProps(ProcessingMode.AT_LEAST_ONCE));
}
@@ -37,7 +35,7 @@ public class DummyStreamsConfig extends StreamsConfig {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfigUtils.processingModeString(processingMode));
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingMode.toString());
return props;
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 6c264701f80..0de1e1e606a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -58,6 +57,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -213,10 +213,11 @@ public class KeyValueStoreTestDriver<K, V> {
logContext,
new TaskId(0, 0),
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
new MockProducer<>(null, true, null, null),
- logContext,
- Time.SYSTEM),
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
+ ),
new DefaultProductionExceptionHandler(),
new MockStreamsMetrics(new Metrics()),
topology
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 7603cd63fc8..d1cdf53c3e4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -88,6 +88,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -451,10 +452,10 @@ public class StreamThreadStateStoreProviderTest {
logContext,
taskId,
new StreamsProducer(
- StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
producer,
- logContext,
- Time.SYSTEM
+ AT_LEAST_ONCE,
+ Time.SYSTEM,
+ logContext
),
streamsConfig.productionExceptionHandler(),
new MockStreamsMetrics(metrics),
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 5f9f140d088..08648805792 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -115,7 +116,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -339,17 +339,17 @@ public class TopologyTestDriver implements Closeable {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
- producer = new MockProducer<byte[], byte[]>(true, bytesSerializer,
bytesSerializer) {
+ producer = new MockProducer<>(true, bytesSerializer, bytesSerializer) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
return Collections.singletonList(new PartitionInfo(topic,
PARTITION_ID, null, null, null));
}
};
testDriverProducer = new TestDriverProducer(
- StreamsConfigUtils.processingMode(streamsConfig),
producer,
- logContext,
- mockWallClockTime
+ StreamsConfigUtils.processingMode(streamsConfig),
+ mockWallClockTime,
+ logContext
);
setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics,
cache);
@@ -1138,9 +1138,8 @@ public class TopologyTestDriver implements Closeable {
" {} configuration during
TopologyTestDriver#close().",
StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
}
- if (processingMode == AT_LEAST_ONCE) {
- producer.close();
- }
+ producer.close();
+ consumer.close();
stateDirectory.clean();
}
@@ -1354,11 +1353,11 @@ public class TopologyTestDriver implements Closeable {
private static class TestDriverProducer extends StreamsProducer {
- public TestDriverProducer(final StreamsConfigUtils.ProcessingMode
processingMode,
- final Producer<byte[], byte[]> producer,
- final LogContext logContext,
- final Time time) {
- super(processingMode, producer, logContext, time);
+ public TestDriverProducer(final Producer<byte[], byte[]> producer,
+ final ProcessingMode processingMode,
+ final Time time,
+ final LogContext logContext) {
+ super(producer, processingMode, time, logContext);
}
@Override