This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22e8e71  KAFKA-9274: Fix commit-TimeoutException handling for EOS 
(#9800)
22e8e71 is described below

commit 22e8e71156762b40ac93e2cbd42eacba00dbfb0c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 6 14:01:02 2021 -0800

    KAFKA-9274: Fix commit-TimeoutException handling for EOS (#9800)
    
    If EOS is enabled and the TX commit fails with a timeout,
    we should not process more messages (what is ok for non-EOS)
    because we don't really know the status of the TX.
    If the commit was indeed successful, we won't have an open TX
    can calling send() would fail with an fatal error.
    
    Instead, we should retry the (idempotent) commit of the TX,
    and start a new TX afterwards.
    
    Reviewers: Boyang Chen <[email protected]>, John Roesler 
<[email protected]>
---
 .../streams/processor/internals/StreamTask.java    |   9 +
 .../processor/internals/StreamTaskTest.java        | 252 ++++++++++++++-------
 2 files changed, 175 insertions(+), 86 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a82ece3..1450212 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -103,6 +103,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private long idleStartTimeMs;
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
+    private boolean hasPendingTxCommit = false;
 
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> inputPartitions,
@@ -378,6 +379,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     // TODO: this should be removed after we decouple caching 
with emitting
                     stateMgr.flushCache();
                     recordCollector.flush();
+                    hasPendingTxCommit = eosEnabled;
 
                     log.debug("Prepared {} task for committing", state());
                     return committableOffsetsAndMetadata();
@@ -476,6 +478,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
         commitRequested = false;
         commitNeeded = false;
+        hasPendingTxCommit = false;
     }
 
     private Map<TopicPartition, Long> extractPartitionTimes() {
@@ -633,6 +636,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             return false;
         }
 
+        if (hasPendingTxCommit) {
+            // if the task has a pending TX commit, we should just retry the 
commit but not process any records
+            // thus, the task is not processable, even if there is available 
data in the record queue
+            return false;
+        }
+
         if (partitionGroup.allPartitionsBuffered()) {
             idleStartTimeMs = RecordQueue.UNKNOWN;
             return true;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ea82337..c1b67d9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -85,6 +85,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
@@ -201,7 +202,15 @@ public class StreamTaskTest {
                                      Collections.emptySet());
     }
 
-    private static StreamsConfig createConfig(final boolean enableEoS, final 
String enforcedProcessingValue) {
+    private static StreamsConfig createConfig() {
+        return createConfig("0");
+    }
+
+    private static StreamsConfig createConfig(final String 
enforcedProcessingValue) {
+        return createConfig(StreamsConfig.AT_LEAST_ONCE, 
enforcedProcessingValue);
+    }
+
+    private static StreamsConfig createConfig(final String eosConfig, final 
String enforcedProcessingValue) {
         final String canonicalPath;
         try {
             canonicalPath = BASE_DIR.getCanonicalPath();
@@ -214,7 +223,7 @@ public class StreamTaskTest {
             mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, canonicalPath),
             mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName()),
-            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? 
StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
             mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 
enforcedProcessingValue)
         )));
     }
@@ -226,7 +235,7 @@ public class StreamTaskTest {
 
         consumer.assign(asList(partition1, partition2));
         consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), 
mkEntry(partition2, 0L)));
-        stateDirectory = new StateDirectory(createConfig(false, "100"), new 
MockTime(), true);
+        stateDirectory = new StateDirectory(createConfig("100"), new 
MockTime(), true);
     }
 
     @After
@@ -256,7 +265,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall();
         EasyMock.replay(stateDirectory, stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), false);
+        task = createStatefulTask(createConfig("100"), false);
 
         assertThrows(LockException.class, task::initializeIfNeeded);
     }
@@ -266,7 +275,7 @@ public class StreamTaskTest {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.replay(stateDirectory);
 
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         task.initializeIfNeeded();
 
@@ -300,7 +309,7 @@ public class StreamTaskTest {
         ctrl.checkOrder(true);
         ctrl.replay();
 
-        task = createStatefulTask(createConfig(true, "100"), true, 
stateManager);
+        task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE, 
"100"), true, stateManager);
         task.suspend();
         task.closeDirty();
         task = null;
@@ -316,7 +325,7 @@ public class StreamTaskTest {
         consumer.commitSync(partitions.stream()
             .collect(Collectors.toMap(Function.identity(), tp -> new 
OffsetAndMetadata(0L, encodeTimestamp(10L)))));
 
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         assertEquals(RecordQueue.UNKNOWN, task.streamTime());
 
@@ -337,7 +346,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateDirectory, stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
 
         assertEquals(Task.State.CREATED, task.state());
 
@@ -363,7 +372,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldProcessInOrder() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
 
         task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 10),
@@ -409,8 +418,73 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertTrue(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+
+        assertFalse(task.process(time.milliseconds()));
+    }
+
+    @Test
+    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() 
{
+        task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE, 
"0"), StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertFalse(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+        assertTrue(task.process(time.milliseconds()));
+
+        assertFalse(task.process(time.milliseconds()));
+    }
+
+    @Test
+    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosBetaEnabled() {
+        task = 
createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE_BETA, "0"), 
StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertFalse(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+        assertTrue(task.process(time.milliseconds()));
+
+        assertFalse(task.process(time.milliseconds()));
+    }
+
+    @Test
     public void shouldRecordBufferedRecords() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", 
task.id().toString(), StreamsConfig.METRICS_LATEST);
 
@@ -432,7 +506,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldRecordProcessRatio() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
 
         final KafkaMetric metric = getMetric("active-process", "%s-ratio", 
task.id().toString(), StreamsConfig.METRICS_LATEST);
 
@@ -559,7 +633,7 @@ public class StreamTaskTest {
     }
 
     private void testMetrics(final String builtInMetricsVersion) {
-        task = createStatelessTask(createConfig(false, "100"), 
builtInMetricsVersion);
+        task = createStatelessTask(createConfig("100"), builtInMetricsVersion);
 
         assertNotNull(getMetric(
             "enforced-processing",
@@ -702,7 +776,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldPauseAndResumeBasedOnBufferedRecords() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 10),
@@ -756,7 +830,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldPunctuateOnceStreamTimeAfterGap() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -841,7 +915,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldRespectPunctuateCancellationStreamTime() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -881,7 +955,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldRespectPunctuateCancellationSystemTime() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         final long now = time.milliseconds();
@@ -895,7 +969,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldRespectCommitNeeded() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -933,7 +1007,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCommitNextOffsetFromQueueIfAvailable() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -952,7 +1026,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -970,7 +1044,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldFailOnCommitIfTaskIsClosed() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.suspend();
         task.transitionTo(Task.State.CLOSED);
 
@@ -984,7 +1058,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldRespectCommitRequested() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -994,13 +1068,13 @@ public class StreamTaskTest {
 
     @Test
     public void shouldEncodeAndDecodeMetadata() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         assertEquals(DEFAULT_TIMESTAMP, 
task.decodeTimestamp(encodeTimestamp(DEFAULT_TIMESTAMP)));
     }
 
     @Test
     public void shouldReturnUnknownTimestampIfUnknownVersion() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         final byte[] emptyMessage = {StreamTask.LATEST_MAGIC_BYTE + 1};
         final String encodedString = 
Base64.getEncoder().encodeToString(emptyMessage);
@@ -1009,14 +1083,14 @@ public class StreamTaskTest {
 
     @Test
     public void shouldReturnUnknownTimestampIfEmptyMessage() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         assertEquals(RecordQueue.UNKNOWN, task.decodeTimestamp(""));
     }
 
     @Test
     public void shouldBeProcessableIfAllPartitionsBuffered() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1036,7 +1110,7 @@ public class StreamTaskTest {
     @Test
     public void shouldBeProcessableIfWaitedForTooLong() {
         // max idle time is 100ms
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1088,8 +1162,8 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotBeProcessableIfNoDataAvailble() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+    public void shouldNotBeProcessableIfNoDataAvailable() {
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1135,7 +1209,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         final long now = time.milliseconds();
@@ -1155,7 +1229,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         assertFalse(task.maybePunctuateSystemTime());
@@ -1166,7 +1240,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldPunctuateOnceSystemTimeAfterGap() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         final long now = time.milliseconds();
@@ -1192,7 +1266,7 @@ public class StreamTaskTest {
 
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1210,7 +1284,7 @@ public class StreamTaskTest {
 
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
 {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1228,16 +1302,22 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotShareHeadersBetweenPunctuateIterations() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
-        task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().headers().add("dummy", (byte[]) null);
-        });
-        task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            
assertFalse(task.processorContext().headers().iterator().hasNext());
-        });
+        task.punctuate(
+            processorSystemTime,
+            1L,
+            PunctuationType.WALL_CLOCK_TIME,
+            timestamp -> task.processorContext().headers().add("dummy", null)
+        );
+        task.punctuate(
+            processorSystemTime,
+            1L,
+            PunctuationType.WALL_CLOCK_TIME,
+            timestamp -> 
assertFalse(task.processorContext().headers().iterator().hasNext())
+        );
     }
 
     @Test
@@ -1247,7 +1327,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createFaultyStatefulTask(createConfig(false, "100"));
+        task = createFaultyStatefulTask(createConfig("100"));
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1275,7 +1355,7 @@ public class StreamTaskTest {
 
         EasyMock.replay(recordCollector, stateDirectory, stateManager);
 
-        task = createDisconnectedTask(createConfig(false, "100"));
+        task = createDisconnectedTask(createConfig("100"));
 
         task.initializeIfNeeded();
 
@@ -1291,7 +1371,7 @@ public class StreamTaskTest {
 
         EasyMock.replay(recordCollector, stateDirectory, stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
 
         task.initializeIfNeeded();
 
@@ -1333,7 +1413,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1363,7 +1443,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1384,7 +1464,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(true, "100"), true);
+        task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE, 
"100"), true);
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1400,7 +1480,7 @@ public class StreamTaskTest {
 
     @Test
     public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         task.processorContext().setCurrentNode(processorStreamTime);
@@ -1414,7 +1494,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
@@ -1425,7 +1505,7 @@ public class StreamTaskTest {
 
     @Test
     public void 
shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
@@ -1434,13 +1514,13 @@ public class StreamTaskTest {
 
     @Test(expected = IllegalStateException.class)
     public void 
shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
     }
 
     @Test
     public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
         task.processorContext().setCurrentNode(processorStreamTime);
         task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
     }
@@ -1454,7 +1534,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createFaultyStatefulTask(createConfig(false, "100"));
+        task = createFaultyStatefulTask(createConfig("100"));
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1481,7 +1561,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        final StreamsConfig config = createConfig(false, "0");
+        final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
             taskId,
             config,
@@ -1532,7 +1612,7 @@ public class StreamTaskTest {
             }
         };
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.initializeIfNeeded();
 
@@ -1541,7 +1621,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowIfCommittingOnIllegalState() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1550,7 +1630,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowIfPostCommittingOnIllegalState() {
-        task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig("100"), 
StreamsConfig.METRICS_LATEST);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1563,7 +1643,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
         EasyMock.replay(stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         task.suspend();
         task.postCommit(true);
     }
@@ -1576,7 +1656,7 @@ public class StreamTaskTest {
                 .andReturn(Collections.singletonMap(partition1, 1L));
         EasyMock.replay(stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         task.suspend();
         task.postCommit(true);
@@ -1592,7 +1672,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();
         EasyMock.replay(stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1613,7 +1693,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().times(2);
         EasyMock.replay(stateManager);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1637,7 +1717,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).once();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(false, "0"), true);
+        task = createStatefulTask(createConfig(), true);
         task.initializeIfNeeded();
         task.completeRestoration();
         task.addRecords(partition1, singleton(getConsumerRecord(partition1, 
10)));
@@ -1656,7 +1736,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.initializeIfNeeded();
 
@@ -1678,7 +1758,7 @@ public class StreamTaskTest {
 
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
         task.closeClean();
@@ -1704,7 +1784,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.initializeIfNeeded();
         task.completeRestoration();
@@ -1732,7 +1812,7 @@ public class StreamTaskTest {
                 .andReturn(Collections.singletonMap(partition1, offset + 
12000L));
         EasyMock.replay(recordCollector, stateManager);
 
-        task = createOptimizedStatefulTask(createConfig(false, "0"), consumer);
+        task = createOptimizedStatefulTask(createConfig(), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1763,7 +1843,7 @@ public class StreamTaskTest {
         EasyMock.replay(recordCollector, stateManager);
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1805,7 +1885,7 @@ public class StreamTaskTest {
         EasyMock.replay(recordCollector, stateManager);
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1840,7 +1920,7 @@ public class StreamTaskTest {
         EasyMock.replay(recordCollector, stateManager);
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
 
         task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, offset)));
@@ -1871,7 +1951,7 @@ public class StreamTaskTest {
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
 
         task.suspend();
@@ -1886,7 +1966,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
@@ -1900,7 +1980,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
@@ -1914,7 +1994,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
@@ -1928,7 +2008,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
         task.closeClean();
@@ -1943,7 +2023,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldUpdatePartitions() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         final Set<TopicPartition> newPartitions = new 
HashSet<>(task.inputPartitions());
         newPartitions.add(new TopicPartition("newTopic", 0));
 
@@ -1957,7 +2037,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowIfCleanClosingDirtyTask() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1970,7 +2050,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowIfRecyclingDirtyTask() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
         task.initializeIfNeeded();
         task.completeRestoration();
 
@@ -1988,7 +2068,7 @@ public class StreamTaskTest {
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         assertThrows(IllegalStateException.class, () -> 
task.closeCleanAndRecycleState()); // CREATED
 
         task.initializeIfNeeded();
@@ -2006,7 +2086,7 @@ public class StreamTaskTest {
     @Test
     public void shouldAlwaysSuspendCreatedTasks() {
         EasyMock.replay(stateManager);
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         assertThat(task.state(), equalTo(CREATED));
         task.suspend();
         assertThat(task.state(), equalTo(SUSPENDED));
@@ -2016,7 +2096,7 @@ public class StreamTaskTest {
     public void shouldAlwaysSuspendRestoringTasks() {
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager);
-        task = createStatefulTask(createConfig(false, "100"), true);
+        task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         assertThat(task.state(), equalTo(RESTORING));
         task.suspend();
@@ -2027,7 +2107,7 @@ public class StreamTaskTest {
     public void shouldAlwaysSuspendRunningTasks() {
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager);
-        task = createFaultyStatefulTask(createConfig(false, "100"));
+        task = createFaultyStatefulTask(createConfig("100"));
         task.initializeIfNeeded();
         task.completeRestoration();
         assertThat(task.state(), equalTo(RUNNING));
@@ -2039,7 +2119,7 @@ public class StreamTaskTest {
     public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
         final InternalProcessorContext context = new ProcessorContextImpl(
                 taskId,
-                createConfig(false, "100"),
+                createConfig("100"),
                 stateManager,
                 streamsMetrics,
                 null
@@ -2049,7 +2129,7 @@ public class StreamTaskTest {
         EasyMock.replay(stateManager);
 
         // The processor topology is missing the topics
-        final ProcessorTopology topology = withSources(asList(), mkMap());
+        final ProcessorTopology topology = withSources(emptyList(), mkMap());
 
         final TopologyException  exception = assertThrows(
             TopologyException.class,
@@ -2058,7 +2138,7 @@ public class StreamTaskTest {
                 partitions,
                 topology,
                 consumer,
-                createConfig(false, "100"),
+                createConfig("100"),
                 metrics,
                 stateDirectory,
                 cache,
@@ -2077,7 +2157,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldInitTaskTimeoutAndEventuallyThrow() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), 
null);
@@ -2090,7 +2170,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCLearTaskTimeout() {
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createStatelessTask(createConfig(), 
StreamsConfig.METRICS_LATEST);
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
@@ -2290,7 +2370,7 @@ public class StreamTaskTest {
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
-        final StreamsConfig config = createConfig(false, "0");
+        final StreamsConfig config = createConfig();
 
         final InternalProcessorContext context = new ProcessorContextImpl(
             taskId,

Reply via email to