This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22e8e71 KAFKA-9274: Fix commit-TimeoutException handling for EOS
(#9800)
22e8e71 is described below
commit 22e8e71156762b40ac93e2cbd42eacba00dbfb0c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 6 14:01:02 2021 -0800
KAFKA-9274: Fix commit-TimeoutException handling for EOS (#9800)
If EOS is enabled and the TX commit fails with a timeout,
we should not process more messages (what is ok for non-EOS)
because we don't really know the status of the TX.
If the commit was indeed successful, we won't have an open TX
can calling send() would fail with an fatal error.
Instead, we should retry the (idempotent) commit of the TX,
and start a new TX afterwards.
Reviewers: Boyang Chen <[email protected]>, John Roesler
<[email protected]>
---
.../streams/processor/internals/StreamTask.java | 9 +
.../processor/internals/StreamTaskTest.java | 252 ++++++++++++++-------
2 files changed, 175 insertions(+), 86 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a82ece3..1450212 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -103,6 +103,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private long idleStartTimeMs;
private boolean commitNeeded = false;
private boolean commitRequested = false;
+ private boolean hasPendingTxCommit = false;
public StreamTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
@@ -378,6 +379,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// TODO: this should be removed after we decouple caching
with emitting
stateMgr.flushCache();
recordCollector.flush();
+ hasPendingTxCommit = eosEnabled;
log.debug("Prepared {} task for committing", state());
return committableOffsetsAndMetadata();
@@ -476,6 +478,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
commitRequested = false;
commitNeeded = false;
+ hasPendingTxCommit = false;
}
private Map<TopicPartition, Long> extractPartitionTimes() {
@@ -633,6 +636,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
return false;
}
+ if (hasPendingTxCommit) {
+ // if the task has a pending TX commit, we should just retry the
commit but not process any records
+ // thus, the task is not processable, even if there is available
data in the record queue
+ return false;
+ }
+
if (partitionGroup.allPartitionsBuffered()) {
idleStartTimeMs = RecordQueue.UNKNOWN;
return true;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ea82337..c1b67d9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -85,6 +85,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@@ -201,7 +202,15 @@ public class StreamTaskTest {
Collections.emptySet());
}
- private static StreamsConfig createConfig(final boolean enableEoS, final
String enforcedProcessingValue) {
+ private static StreamsConfig createConfig() {
+ return createConfig("0");
+ }
+
+ private static StreamsConfig createConfig(final String
enforcedProcessingValue) {
+ return createConfig(StreamsConfig.AT_LEAST_ONCE,
enforcedProcessingValue);
+ }
+
+ private static StreamsConfig createConfig(final String eosConfig, final
String enforcedProcessingValue) {
final String canonicalPath;
try {
canonicalPath = BASE_DIR.getCanonicalPath();
@@ -214,7 +223,7 @@ public class StreamTaskTest {
mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, canonicalPath),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MockTimestampExtractor.class.getName()),
- mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ?
StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
enforcedProcessingValue)
)));
}
@@ -226,7 +235,7 @@ public class StreamTaskTest {
consumer.assign(asList(partition1, partition2));
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L),
mkEntry(partition2, 0L)));
- stateDirectory = new StateDirectory(createConfig(false, "100"), new
MockTime(), true);
+ stateDirectory = new StateDirectory(createConfig("100"), new
MockTime(), true);
}
@After
@@ -256,7 +265,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall();
EasyMock.replay(stateDirectory, stateManager);
- task = createStatefulTask(createConfig(false, "100"), false);
+ task = createStatefulTask(createConfig("100"), false);
assertThrows(LockException.class, task::initializeIfNeeded);
}
@@ -266,7 +275,7 @@ public class StreamTaskTest {
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
EasyMock.replay(stateDirectory);
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
@@ -300,7 +309,7 @@ public class StreamTaskTest {
ctrl.checkOrder(true);
ctrl.replay();
- task = createStatefulTask(createConfig(true, "100"), true,
stateManager);
+ task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE,
"100"), true, stateManager);
task.suspend();
task.closeDirty();
task = null;
@@ -316,7 +325,7 @@ public class StreamTaskTest {
consumer.commitSync(partitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> new
OffsetAndMetadata(0L, encodeTimestamp(10L)))));
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
assertEquals(RecordQueue.UNKNOWN, task.streamTime());
@@ -337,7 +346,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateDirectory, stateManager, recordCollector);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
assertEquals(Task.State.CREATED, task.state());
@@ -363,7 +372,7 @@ public class StreamTaskTest {
@Test
public void shouldProcessInOrder() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.addRecords(partition1, asList(
getConsumerRecord(partition1, 10),
@@ -409,8 +418,73 @@ public class StreamTaskTest {
}
@Test
+ public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
+
+ assertFalse(task.process(time.milliseconds()));
+
+ task.addRecords(partition1, asList(
+ getConsumerRecord(partition1, 10),
+ getConsumerRecord(partition1, 20),
+ getConsumerRecord(partition1, 30)
+ ));
+
+ assertTrue(task.process(time.milliseconds()));
+ task.prepareCommit();
+ assertTrue(task.process(time.milliseconds()));
+ task.postCommit(false);
+ assertTrue(task.process(time.milliseconds()));
+
+ assertFalse(task.process(time.milliseconds()));
+ }
+
+ @Test
+ public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled()
{
+ task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE,
"0"), StreamsConfig.METRICS_LATEST);
+
+ assertFalse(task.process(time.milliseconds()));
+
+ task.addRecords(partition1, asList(
+ getConsumerRecord(partition1, 10),
+ getConsumerRecord(partition1, 20),
+ getConsumerRecord(partition1, 30)
+ ));
+
+ assertTrue(task.process(time.milliseconds()));
+ task.prepareCommit();
+ assertFalse(task.process(time.milliseconds()));
+ task.postCommit(false);
+ assertTrue(task.process(time.milliseconds()));
+ assertTrue(task.process(time.milliseconds()));
+
+ assertFalse(task.process(time.milliseconds()));
+ }
+
+ @Test
+ public void shouldNotProcessRecordsAfterPrepareCommitWhenEosBetaEnabled() {
+ task =
createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE_BETA, "0"),
StreamsConfig.METRICS_LATEST);
+
+ assertFalse(task.process(time.milliseconds()));
+
+ task.addRecords(partition1, asList(
+ getConsumerRecord(partition1, 10),
+ getConsumerRecord(partition1, 20),
+ getConsumerRecord(partition1, 30)
+ ));
+
+ assertTrue(task.process(time.milliseconds()));
+ task.prepareCommit();
+ assertFalse(task.process(time.milliseconds()));
+ task.postCommit(false);
+ assertTrue(task.process(time.milliseconds()));
+ assertTrue(task.process(time.milliseconds()));
+
+ assertFalse(task.process(time.milliseconds()));
+ }
+
+ @Test
public void shouldRecordBufferedRecords() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
final KafkaMetric metric = getMetric("active-buffer", "%s-count",
task.id().toString(), StreamsConfig.METRICS_LATEST);
@@ -432,7 +506,7 @@ public class StreamTaskTest {
@Test
public void shouldRecordProcessRatio() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
final KafkaMetric metric = getMetric("active-process", "%s-ratio",
task.id().toString(), StreamsConfig.METRICS_LATEST);
@@ -559,7 +633,7 @@ public class StreamTaskTest {
}
private void testMetrics(final String builtInMetricsVersion) {
- task = createStatelessTask(createConfig(false, "100"),
builtInMetricsVersion);
+ task = createStatelessTask(createConfig("100"), builtInMetricsVersion);
assertNotNull(getMetric(
"enforced-processing",
@@ -702,7 +776,7 @@ public class StreamTaskTest {
@Test
public void shouldPauseAndResumeBasedOnBufferedRecords() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.addRecords(partition1, asList(
getConsumerRecord(partition1, 10),
@@ -756,7 +830,7 @@ public class StreamTaskTest {
@Test
public void shouldPunctuateOnceStreamTimeAfterGap() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -841,7 +915,7 @@ public class StreamTaskTest {
@Test
public void shouldRespectPunctuateCancellationStreamTime() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -881,7 +955,7 @@ public class StreamTaskTest {
@Test
public void shouldRespectPunctuateCancellationSystemTime() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
final long now = time.milliseconds();
@@ -895,7 +969,7 @@ public class StreamTaskTest {
@Test
public void shouldRespectCommitNeeded() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -933,7 +1007,7 @@ public class StreamTaskTest {
@Test
public void shouldCommitNextOffsetFromQueueIfAvailable() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -952,7 +1026,7 @@ public class StreamTaskTest {
@Test
public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -970,7 +1044,7 @@ public class StreamTaskTest {
@Test
public void shouldFailOnCommitIfTaskIsClosed() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.suspend();
task.transitionTo(Task.State.CLOSED);
@@ -984,7 +1058,7 @@ public class StreamTaskTest {
@Test
public void shouldRespectCommitRequested() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -994,13 +1068,13 @@ public class StreamTaskTest {
@Test
public void shouldEncodeAndDecodeMetadata() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
assertEquals(DEFAULT_TIMESTAMP,
task.decodeTimestamp(encodeTimestamp(DEFAULT_TIMESTAMP)));
}
@Test
public void shouldReturnUnknownTimestampIfUnknownVersion() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
final byte[] emptyMessage = {StreamTask.LATEST_MAGIC_BYTE + 1};
final String encodedString =
Base64.getEncoder().encodeToString(emptyMessage);
@@ -1009,14 +1083,14 @@ public class StreamTaskTest {
@Test
public void shouldReturnUnknownTimestampIfEmptyMessage() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
assertEquals(RecordQueue.UNKNOWN, task.decodeTimestamp(""));
}
@Test
public void shouldBeProcessableIfAllPartitionsBuffered() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1036,7 +1110,7 @@ public class StreamTaskTest {
@Test
public void shouldBeProcessableIfWaitedForTooLong() {
// max idle time is 100ms
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1088,8 +1162,8 @@ public class StreamTaskTest {
}
@Test
- public void shouldNotBeProcessableIfNoDataAvailble() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ public void shouldNotBeProcessableIfNoDataAvailable() {
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1135,7 +1209,7 @@ public class StreamTaskTest {
@Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
final long now = time.milliseconds();
@@ -1155,7 +1229,7 @@ public class StreamTaskTest {
@Test
public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
assertFalse(task.maybePunctuateSystemTime());
@@ -1166,7 +1240,7 @@ public class StreamTaskTest {
@Test
public void shouldPunctuateOnceSystemTimeAfterGap() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
final long now = time.milliseconds();
@@ -1192,7 +1266,7 @@ public class StreamTaskTest {
@Test
public void
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
{
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1210,7 +1284,7 @@ public class StreamTaskTest {
@Test
public void
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
{
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1228,16 +1302,22 @@ public class StreamTaskTest {
@Test
public void shouldNotShareHeadersBetweenPunctuateIterations() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
- task.punctuate(processorSystemTime, 1,
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
- task.processorContext().headers().add("dummy", (byte[]) null);
- });
- task.punctuate(processorSystemTime, 1,
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-
assertFalse(task.processorContext().headers().iterator().hasNext());
- });
+ task.punctuate(
+ processorSystemTime,
+ 1L,
+ PunctuationType.WALL_CLOCK_TIME,
+ timestamp -> task.processorContext().headers().add("dummy", null)
+ );
+ task.punctuate(
+ processorSystemTime,
+ 1L,
+ PunctuationType.WALL_CLOCK_TIME,
+ timestamp ->
assertFalse(task.processorContext().headers().iterator().hasNext())
+ );
}
@Test
@@ -1247,7 +1327,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createFaultyStatefulTask(createConfig(false, "100"));
+ task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
@@ -1275,7 +1355,7 @@ public class StreamTaskTest {
EasyMock.replay(recordCollector, stateDirectory, stateManager);
- task = createDisconnectedTask(createConfig(false, "100"));
+ task = createDisconnectedTask(createConfig("100"));
task.initializeIfNeeded();
@@ -1291,7 +1371,7 @@ public class StreamTaskTest {
EasyMock.replay(recordCollector, stateDirectory, stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
@@ -1333,7 +1413,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall();
EasyMock.replay(stateManager, recordCollector);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1363,7 +1443,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall();
EasyMock.replay(stateManager, recordCollector);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1384,7 +1464,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createStatefulTask(createConfig(true, "100"), true);
+ task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE,
"100"), true);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1400,7 +1480,7 @@ public class StreamTaskTest {
@Test
public void
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.processorContext().setCurrentNode(processorStreamTime);
@@ -1414,7 +1494,7 @@ public class StreamTaskTest {
@Test
public void shouldCallPunctuateOnPassedInProcessorNode() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME,
punctuator);
@@ -1425,7 +1505,7 @@ public class StreamTaskTest {
@Test
public void
shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME,
punctuator);
@@ -1434,13 +1514,13 @@ public class StreamTaskTest {
@Test(expected = IllegalStateException.class)
public void
shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
}
@Test
public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.processorContext().setCurrentNode(processorStreamTime);
task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
}
@@ -1454,7 +1534,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createFaultyStatefulTask(createConfig(false, "100"));
+ task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
@@ -1481,7 +1561,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- final StreamsConfig config = createConfig(false, "0");
+ final StreamsConfig config = createConfig();
final InternalProcessorContext context = new ProcessorContextImpl(
taskId,
config,
@@ -1532,7 +1612,7 @@ public class StreamTaskTest {
}
};
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
@@ -1541,7 +1621,7 @@ public class StreamTaskTest {
@Test
public void shouldThrowIfCommittingOnIllegalState() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.transitionTo(Task.State.SUSPENDED);
task.transitionTo(Task.State.CLOSED);
@@ -1550,7 +1630,7 @@ public class StreamTaskTest {
@Test
public void shouldThrowIfPostCommittingOnIllegalState() {
- task = createStatelessTask(createConfig(false, "100"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig("100"),
StreamsConfig.METRICS_LATEST);
task.transitionTo(Task.State.SUSPENDED);
task.transitionTo(Task.State.CLOSED);
@@ -1563,7 +1643,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall().andThrow(new AssertionError("Should not have
tried to checkpoint"));
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.suspend();
task.postCommit(true);
}
@@ -1576,7 +1656,7 @@ public class StreamTaskTest {
.andReturn(Collections.singletonMap(partition1, 1L));
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.suspend();
task.postCommit(true);
@@ -1592,7 +1672,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1613,7 +1693,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall().times(2);
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1637,7 +1717,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).once();
EasyMock.replay(stateManager, recordCollector);
- task = createStatefulTask(createConfig(false, "0"), true);
+ task = createStatefulTask(createConfig(), true);
task.initializeIfNeeded();
task.completeRestoration();
task.addRecords(partition1, singleton(getConsumerRecord(partition1,
10)));
@@ -1656,7 +1736,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
@@ -1678,7 +1758,7 @@ public class StreamTaskTest {
final MetricName metricName = setupCloseTaskMetric();
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.suspend();
task.closeClean();
@@ -1704,7 +1784,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1732,7 +1812,7 @@ public class StreamTaskTest {
.andReturn(Collections.singletonMap(partition1, offset +
12000L));
EasyMock.replay(recordCollector, stateManager);
- task = createOptimizedStatefulTask(createConfig(false, "0"), consumer);
+ task = createOptimizedStatefulTask(createConfig(), consumer);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1763,7 +1843,7 @@ public class StreamTaskTest {
EasyMock.replay(recordCollector, stateManager);
final MetricName metricName = setupCloseTaskMetric();
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1805,7 +1885,7 @@ public class StreamTaskTest {
EasyMock.replay(recordCollector, stateManager);
final MetricName metricName = setupCloseTaskMetric();
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1840,7 +1920,7 @@ public class StreamTaskTest {
EasyMock.replay(recordCollector, stateManager);
final MetricName metricName = setupCloseTaskMetric();
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.addRecords(partition1,
singletonList(getConsumerRecord(partition1, offset)));
@@ -1871,7 +1951,7 @@ public class StreamTaskTest {
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.suspend();
@@ -1886,7 +1966,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.suspend();
assertThat(getTaskMetrics(), not(empty()));
@@ -1900,7 +1980,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.suspend();
assertThat(getTaskMetrics(), not(empty()));
@@ -1914,7 +1994,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.suspend();
assertThat(getTaskMetrics(), not(empty()));
@@ -1928,7 +2008,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.suspend();
task.closeClean();
@@ -1943,7 +2023,7 @@ public class StreamTaskTest {
@Test
public void shouldUpdatePartitions() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
final Set<TopicPartition> newPartitions = new
HashSet<>(task.inputPartitions());
newPartitions.add(new TopicPartition("newTopic", 0));
@@ -1957,7 +2037,7 @@ public class StreamTaskTest {
@Test
public void shouldThrowIfCleanClosingDirtyTask() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1970,7 +2050,7 @@ public class StreamTaskTest {
@Test
public void shouldThrowIfRecyclingDirtyTask() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
@@ -1988,7 +2068,7 @@ public class StreamTaskTest {
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
assertThrows(IllegalStateException.class, () ->
task.closeCleanAndRecycleState()); // CREATED
task.initializeIfNeeded();
@@ -2006,7 +2086,7 @@ public class StreamTaskTest {
@Test
public void shouldAlwaysSuspendCreatedTasks() {
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
assertThat(task.state(), equalTo(CREATED));
task.suspend();
assertThat(task.state(), equalTo(SUSPENDED));
@@ -2016,7 +2096,7 @@ public class StreamTaskTest {
public void shouldAlwaysSuspendRestoringTasks() {
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager);
- task = createStatefulTask(createConfig(false, "100"), true);
+ task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
assertThat(task.state(), equalTo(RESTORING));
task.suspend();
@@ -2027,7 +2107,7 @@ public class StreamTaskTest {
public void shouldAlwaysSuspendRunningTasks() {
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager);
- task = createFaultyStatefulTask(createConfig(false, "100"));
+ task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
assertThat(task.state(), equalTo(RUNNING));
@@ -2039,7 +2119,7 @@ public class StreamTaskTest {
public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
final InternalProcessorContext context = new ProcessorContextImpl(
taskId,
- createConfig(false, "100"),
+ createConfig("100"),
stateManager,
streamsMetrics,
null
@@ -2049,7 +2129,7 @@ public class StreamTaskTest {
EasyMock.replay(stateManager);
// The processor topology is missing the topics
- final ProcessorTopology topology = withSources(asList(), mkMap());
+ final ProcessorTopology topology = withSources(emptyList(), mkMap());
final TopologyException exception = assertThrows(
TopologyException.class,
@@ -2058,7 +2138,7 @@ public class StreamTaskTest {
partitions,
topology,
consumer,
- createConfig(false, "100"),
+ createConfig("100"),
metrics,
stateDirectory,
cache,
@@ -2077,7 +2157,7 @@ public class StreamTaskTest {
@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.maybeInitTaskTimeoutOrThrow(0L, null);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(),
null);
@@ -2090,7 +2170,7 @@ public class StreamTaskTest {
@Test
public void shouldCLearTaskTimeout() {
- task = createStatelessTask(createConfig(false, "0"),
StreamsConfig.METRICS_LATEST);
+ task = createStatelessTask(createConfig(),
StreamsConfig.METRICS_LATEST);
task.maybeInitTaskTimeoutOrThrow(0L, null);
task.clearTaskTimeout();
@@ -2290,7 +2370,7 @@ public class StreamTaskTest {
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
- final StreamsConfig config = createConfig(false, "0");
+ final StreamsConfig config = createConfig();
final InternalProcessorContext context = new ProcessorContextImpl(
taskId,