This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 079e5d647ce KAFKA-15326: [8/N] Move consumer interaction out of
processing methods (#14226)
079e5d647ce is described below
commit 079e5d647ce39cf2ab5b5f37c5ce28b59fb6db13
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 26 18:17:23 2023 +0200
KAFKA-15326: [8/N] Move consumer interaction out of processing methods
(#14226)
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:
* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer
We introduce updateLags() and
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.
Reviewer: Bruno Cadonna <[email protected]>
---
.../processor/internals/PartitionGroup.java | 28 ++++++-
.../streams/processor/internals/ReadOnlyTask.java | 10 +++
.../streams/processor/internals/StandbyTask.java | 10 +++
.../streams/processor/internals/StreamTask.java | 20 ++++-
.../streams/processor/internals/StreamThread.java | 8 +-
.../kafka/streams/processor/internals/Task.java | 11 +++
.../streams/processor/internals/TaskManager.java | 23 +++++-
.../integration/PauseResumeIntegrationTest.java | 4 +-
.../integration/utils/IntegrationTestUtils.java | 2 +-
.../processor/internals/PartitionGroupTest.java | 87 +++++++++++++++++++++-
.../processor/internals/StreamTaskTest.java | 39 ++++++++++
.../processor/internals/StreamThreadTest.java | 30 ++++++++
.../processor/internals/TaskManagerTest.java | 46 ++++++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 2 +
14 files changed, 306 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 7ce538e66ba..b0a3f6f4f98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -70,6 +70,7 @@ public class PartitionGroup {
private int totalBuffered;
private boolean allBuffered;
private final Map<TopicPartition, Long> idlePartitionDeadlines = new
HashMap<>();
+ private final Map<TopicPartition, Long> fetchedLags = new HashMap<>();
static class RecordInfo {
RecordQueue queue;
@@ -140,20 +141,22 @@ public class PartitionGroup {
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else {
- final OptionalLong fetchedLag = lagProvider.apply(partition);
+ final Long fetchedLag = fetchedLags.getOrDefault(partition,
-1L);
- if (!fetchedLag.isPresent()) {
+ logger.trace("Fetched lag for {} is {}", partition,
fetchedLag);
+
+ if (fetchedLag == -1L) {
// must wait to fetch metadata for the partition
idlePartitionDeadlines.remove(partition);
logger.trace("Waiting to fetch data for {}", partition);
return false;
- } else if (fetchedLag.getAsLong() > 0L) {
+ } else if (fetchedLag > 0L) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is
buffered locally. Waiting to buffer some records.",
partition,
- fetchedLag.getAsLong()
+ fetchedLag
);
return false;
} else {
@@ -359,6 +362,7 @@ public class PartitionGroup {
return totalBuffered;
}
+ // for testing only
boolean allPartitionsBufferedLocally() {
return allBuffered;
}
@@ -370,6 +374,7 @@ public class PartitionGroup {
nonEmptyQueuesByTime.clear();
totalBuffered = 0;
streamTime = RecordQueue.UNKNOWN;
+ fetchedLags.clear();
}
void close() {
@@ -377,4 +382,19 @@ public class PartitionGroup {
queue.close();
}
}
+
+ void updateLags() {
+ if (maxTaskIdleMs != StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+ for (final TopicPartition tp : partitionQueues.keySet()) {
+ final OptionalLong l = lagProvider.apply(tp);
+ if (l.isPresent()) {
+ fetchedLags.put(tp, l.getAsLong());
+ logger.trace("Updated lag for {} to {}", tp,
l.getAsLong());
+ } else {
+ fetchedLags.remove(tp);
+ }
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index cd1525cce8e..2fb37564155 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -134,6 +134,16 @@ public class ReadOnlyTask implements Task {
throw new UnsupportedOperationException("This task is read-only");
}
+ @Override
+ public void resumePollingForPartitionsWithAvailableSpace() {
+ throw new UnsupportedOperationException("This task is read-only");
+ }
+
+ @Override
+ public void updateLags() {
+ throw new UnsupportedOperationException("This task is read-only");
+ }
+
@Override
public void addRecords(final TopicPartition partition, final
Iterable<ConsumerRecord<byte[], byte[]>> records) {
throw new UnsupportedOperationException("This task is read-only");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 50905af8542..af4cb383808 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -254,6 +254,16 @@ public class StandbyTask extends AbstractTask implements
Task {
log.info("Closed and recycled state");
}
+ @Override
+ public void resumePollingForPartitionsWithAvailableSpace() {
+ // noop
+ }
+
+ @Override
+ public void updateLags() {
+ // noop
+ }
+
private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index dffd0ecef29..022ddbcf4c4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -84,6 +84,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private final Map<TopicPartition, Long> committedOffsets;
private final Map<TopicPartition, Long> highWatermark;
private final Set<TopicPartition> resetOffsetsForPartitions;
+ private final Set<TopicPartition> partitionsToResume;
private final PunctuationQueue streamTimePunctuationQueue;
private final PunctuationQueue systemTimePunctuationQueue;
private final StreamsMetricsImpl streamsMetrics;
@@ -176,6 +177,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// initialize the consumed and committed offset cache
consumedOffsets = new HashMap<>();
resetOffsetsForPartitions = new HashSet<>();
+ partitionsToResume = new HashSet<>();
recordQueueCreator = new RecordQueueCreator(this.logContext,
config.timestampExtractor, config.deserializationExceptionHandler);
@@ -590,6 +592,21 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
log.info("Closed and recycled state");
}
+ @Override
+ public void resumePollingForPartitionsWithAvailableSpace() {
+ if (!partitionsToResume.isEmpty()) {
+ mainConsumer.resume(partitionsToResume);
+ partitionsToResume.clear();
+ }
+ }
+
+ @Override
+ public void updateLags() {
+ if (state() == State.RUNNING) {
+ partitionGroup.updateLags();
+ }
+ }
+
/**
* The following exceptions maybe thrown from the state manager flushing
call
*
@@ -678,6 +695,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
record = null;
closeTaskSensor.record();
+ partitionsToResume.clear();
transitionTo(State.CLOSED);
}
@@ -748,7 +766,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() == maxBufferedSize) {
- mainConsumer.resume(singleton(partition));
+ partitionsToResume.add(partition);
}
record = null;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 572fa4e353b..a7ac2566248 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -770,7 +770,13 @@ public class StreamThread extends Thread {
final long startMs = time.milliseconds();
now = startMs;
- final long pollLatency = pollPhase();
+ final long pollLatency;
+ taskManager.resumePollingForPartitionsWithAvailableSpace();
+ try {
+ pollLatency = pollPhase();
+ } finally {
+ taskManager.updateLags();
+ }
// Shutdown hook could potentially be triggered and transit the thread
state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the
state transition happens during #onPartitionsAssigned().
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 1bd7f9c7f19..3ea6d44e4da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -162,6 +162,17 @@ public interface Task {
*/
void prepareRecycle();
+ /**
+ * Resumes polling in the main consumer for all partitions for which
+ * the corresponding record queues have capacity (again).
+ */
+ void resumePollingForPartitionsWithAvailableSpace();
+
+ /**
+ * Fetches up-to-date lag information from the consumer.
+ */
+ void updateLags();
+
// runtime methods (using in RUNNING state)
void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[],
byte[]>> records);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 349d1ccccba..e7d8bbb3dae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1635,6 +1635,25 @@ public class TaskManager {
return commit(tasks.allTasks());
}
+ /**
+ * Resumes polling in the main consumer for all partitions for which
+ * the corresponding record queues have capacity (again).
+ */
+ public void resumePollingForPartitionsWithAvailableSpace() {
+ for (final Task t: tasks.activeTasks()) {
+ t.resumePollingForPartitionsWithAvailableSpace();
+ }
+ }
+
+ /**
+ * Fetches up-to-date lag information from the consumer.
+ */
+ public void updateLags() {
+ for (final Task t: tasks.activeTasks()) {
+ t.updateLags();
+ }
+ }
+
/**
* Take records and add them to each respective task
*
@@ -1902,8 +1921,4 @@ public class TaskManager {
void addTask(final Task task) {
tasks.addTask(task);
}
-
- TasksRegistry tasks() {
- return tasks;
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index ac092d3b1c0..5c3f8aa90d0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -373,8 +373,8 @@ public class PauseResumeIntegrationTest {
private void assertStreamsLocalStoreLagStaysConstant(final KafkaStreams
streams) throws InterruptedException {
waitForCondition(
- () -> !streams.allLocalStorePartitionLags().isEmpty(),
- "Lags for local store partitions were not found within the
timeout!");
+ () ->
streams.allLocalStorePartitionLags().containsKey("test-store"),
+ "Lags for test-store partitions were not found within the
timeout!");
waitUntilStreamsHasPolled(streams, 2);
final long stateStoreLag1 =
streams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
waitUntilStreamsHasPolled(streams, 2);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 5ec697bce32..d5bb594cdaa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1452,7 +1452,7 @@ public class IntegrationTestUtils {
public static void waitUntilStreamsHasPolled(final KafkaStreams
kafkaStreams, final int pollNumber)
throws InterruptedException {
final Double initialCount = getStreamsPollNumber(kafkaStreams);
- retryOnExceptionWithTimeout(1000, () -> {
+ retryOnExceptionWithTimeout(10000, () -> {
assertThat(getStreamsPollNumber(kafkaStreams),
is(greaterThanOrEqualTo(initialCount + pollNumber)));
});
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 87ac1b762f2..ffc8524d655 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -55,6 +55,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -427,7 +428,17 @@ public class PartitionGroupTest {
@Test
public void shouldEmptyPartitionsOnClear() {
- final PartitionGroup group = getBasicGroup();
+ final PartitionGroup group =
+ new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1)
+ ),
+ tp -> OptionalLong.of(0L),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 10
+ );
final List<ConsumerRecord<byte[], byte[]>> list = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
@@ -436,6 +447,7 @@ public class PartitionGroupTest {
group.addRawRecords(partition1, list);
group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds());
group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds());
+ group.updateLags();
group.clear();
@@ -443,6 +455,7 @@ public class PartitionGroupTest {
assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN));
assertThat(group.nextRecord(new PartitionGroup.RecordInfo(),
time.milliseconds()), equalTo(null));
assertThat(group.partitionTimestamp(partition1),
equalTo(RecordQueue.UNKNOWN));
+ hasNoFetchedLag(group, partition1);
group.addRawRecords(partition1, list);
}
@@ -652,6 +665,7 @@ public class PartitionGroupTest {
);
}
lags.put(partition2, OptionalLong.of(0L));
+ group.updateLags();
assertThat(group.readyToProcess(0L), is(true));
}
@@ -676,6 +690,7 @@ public class PartitionGroupTest {
group.addRawRecords(partition1, list1);
lags.put(partition2, OptionalLong.of(1L));
+ group.updateLags();
assertThat(group.allPartitionsBufferedLocally(), is(false));
@@ -705,6 +720,7 @@ public class PartitionGroupTest {
enforcedProcessingSensor,
1L
);
+ group.updateLags();
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
@@ -764,6 +780,75 @@ public class PartitionGroupTest {
}
}
+ private void hasNoFetchedLag(final PartitionGroup group, final
TopicPartition partition) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ appender.setClassLoggerToTrace(PartitionGroup.class);
+ assertFalse(group.readyToProcess(0L));
+ assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
+ equalTo(String.format("[test] Waiting to fetch data for %s",
partition)))));
+ }
+ }
+
+ private void hasZeroFetchedLag(final PartitionGroup group, final
TopicPartition partition) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ appender.setClassLoggerToTrace(PartitionGroup.class);
+ assertFalse(group.readyToProcess(0L));
+ assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
+ startsWith(String.format("[test] Lag for %s is currently 0 and
current time is %d. "
+ + "Waiting for new data to be produced for configured idle
time", partition, 0L)))));
+ }
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private void hasNonZeroFetchedLag(final PartitionGroup group, final
TopicPartition partition, final long lag) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ appender.setClassLoggerToTrace(PartitionGroup.class);
+ assertFalse(group.readyToProcess(0L));
+ assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
+ equalTo(String.format("[test] Lag for %s is currently %d, but
no data is buffered locally. "
+ + "Waiting to buffer some records.", partition, lag)))));
+ }
+ }
+
+
+ @Test
+ public void shouldUpdateLags() {
+ final HashMap<TopicPartition, OptionalLong> lags = new HashMap<>();
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1)
+ ),
+ tp -> lags.getOrDefault(tp, OptionalLong.empty()),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 10L
+ );
+
+ hasNoFetchedLag(group, partition1);
+
+ lags.put(partition1, OptionalLong.of(5));
+
+ hasNoFetchedLag(group, partition1);
+
+ group.updateLags();
+
+ hasNonZeroFetchedLag(group, partition1, 5);
+
+ lags.put(partition1, OptionalLong.of(0));
+ group.updateLags();
+
+ hasZeroFetchedLag(group, partition1);
+
+ lags.remove(partition1);
+
+ hasZeroFetchedLag(group, partition1);
+
+ group.updateLags();
+
+ hasNoFetchedLag(group, partition1);
+ }
+
private PartitionGroup getBasicGroup() {
return new PartitionGroup(
logContext,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 947a4c53c61..7ed11a61ece 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -521,6 +521,10 @@ public class StreamTaskTest {
@Test
public void shouldProcessInOrder() {
task = createStatelessTask(createConfig());
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ task.resumePollingForPartitionsWithAvailableSpace();
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 10, 101),
@@ -534,6 +538,8 @@ public class StreamTaskTest {
getConsumerRecordWithOffsetAsTimestamp(partition2, 45, 203)
));
+ task.updateLags();
+
assertTrue(task.process(0L));
assertEquals(5, task.numBuffered());
assertEquals(1, source1.numReceived);
@@ -958,6 +964,8 @@ public class StreamTaskTest {
@Test
public void shouldPauseAndResumeBasedOnBufferedRecords() {
task = createStatelessTask(createConfig("100"));
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
@@ -992,6 +1000,12 @@ public class StreamTaskTest {
assertEquals(2, source1.numReceived);
assertEquals(0, source2.numReceived);
+ assertEquals(2, consumer.paused().size());
+ assertTrue(consumer.paused().contains(partition1));
+ assertTrue(consumer.paused().contains(partition2));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+
assertEquals(1, consumer.paused().size());
assertTrue(consumer.paused().contains(partition2));
@@ -1006,6 +1020,11 @@ public class StreamTaskTest {
assertEquals(3, source1.numReceived);
assertEquals(1, source2.numReceived);
+ assertEquals(1, consumer.paused().size());
+ assertTrue(consumer.paused().contains(partition2));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+
assertEquals(0, consumer.paused().size());
}
@@ -1015,6 +1034,8 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
+ task.resumePollingForPartitionsWithAvailableSpace();
+
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
getConsumerRecordWithOffsetAsTimestamp(partition1, 142),
@@ -1029,6 +1050,8 @@ public class StreamTaskTest {
getConsumerRecordWithOffsetAsTimestamp(partition2, 161)
));
+ task.updateLags();
+
// st: -1
assertFalse(task.canPunctuateStreamTime());
assertFalse(task.maybePunctuateStreamTime()); // punctuate at 20
@@ -1263,7 +1286,9 @@ public class StreamTaskTest {
// the task should still be committed since the processed records have
not reached the consumer position
assertTrue(task.commitNeeded());
+ task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
+ task.updateLags();
task.process(0L);
assertTrue(task.commitNeeded());
@@ -1284,6 +1309,8 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
+ task.resumePollingForPartitionsWithAvailableSpace();
+
consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1,
0L));
consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1,
1L));
consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition2,
0L));
@@ -1293,6 +1320,8 @@ public class StreamTaskTest {
task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L)));
task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 1L)));
+ task.updateLags();
+
task.process(0L);
processorStreamTime.mockProcessor.addProcessorMetadata("key1", 100L);
@@ -1820,6 +1849,9 @@ public class StreamTaskTest {
task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 5L)));
task.addRecords(repartition,
singletonList(getConsumerRecordWithOffsetAsTimestamp(repartition, 10L)));
+ task.resumePollingForPartitionsWithAvailableSpace();
+ task.updateLags();
+
assertTrue(task.process(0L));
assertTrue(task.process(0L));
@@ -2508,8 +2540,10 @@ public class StreamTaskTest {
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
+ task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
+ task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
@@ -2538,8 +2572,10 @@ public class StreamTaskTest {
getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
+ task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
+ task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
@@ -2565,8 +2601,10 @@ public class StreamTaskTest {
getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
consumer.addRecord(records.get(0));
consumer.addRecord(records.get(1));
+ task.resumePollingForPartitionsWithAvailableSpace();
consumer.poll(Duration.ZERO);
task.addRecords(partition1, records);
+ task.updateLags();
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
@@ -2607,6 +2645,7 @@ public class StreamTaskTest {
verify(recordCollector, never()).offsets();
}
+
private ProcessorStateManager mockStateManager() {
final ProcessorStateManager manager =
mock(ProcessorStateManager.class);
doReturn(TaskType.ACTIVE).when(manager).taskType();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1784a5335c8..9cda974cb95 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -90,6 +90,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@@ -3143,6 +3144,35 @@ public class StreamThreadTest {
Mockito.verify(taskManager,
times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any());
}
+ @Test
+ public void shouldUpdateLagsAfterPolling() {
+ final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
+ final StreamThread streamThread = setUpThread(streamsConfigProps);
+ streamThread.setState(State.STARTING);
+ streamThread.setState(State.PARTITIONS_ASSIGNED);
+
+ streamThread.runOnce();
+
+ final InOrder inOrder = Mockito.inOrder(mainConsumer,
streamThread.taskManager());
+ inOrder.verify(mainConsumer).poll(Mockito.any());
+ inOrder.verify(streamThread.taskManager()).updateLags();
+ }
+
+
+ @Test
+ public void
shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling() {
+ final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
+ final StreamThread streamThread = setUpThread(streamsConfigProps);
+ streamThread.setState(State.STARTING);
+ streamThread.setState(State.PARTITIONS_ASSIGNED);
+
+ streamThread.runOnce();
+
+ final InOrder inOrder = Mockito.inOrder(streamThread.taskManager(),
mainConsumer);
+
inOrder.verify(streamThread.taskManager()).resumePollingForPartitionsWithAvailableSpace();
+ inOrder.verify(mainConsumer).poll(Mockito.any());
+ }
+
@Test
public void
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b00102810c7..705bea18d55 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -286,6 +286,42 @@ public class TaskManagerTest {
Mockito.verify(standbyTask).resume();
}
+ @Test
+ public void
shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks() {
+ final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask activeTask2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
+
+ taskManager.resumePollingForPartitionsWithAvailableSpace();
+
+
Mockito.verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
+
Mockito.verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
+ }
+
+ @Test
+ public void shouldUpdateLagForAllActiveTasks() {
+ final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask activeTask2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
+ final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
+
+ taskManager.updateLags();
+
+ Mockito.verify(activeTask1).updateLags();
+ Mockito.verify(activeTask2).updateLags();
+ }
+
@Test
public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId03,
taskId03ChangelogPartitions)
@@ -4835,6 +4871,16 @@ public class TaskManagerTest {
transitionTo(State.CLOSED);
}
+ @Override
+ public void resumePollingForPartitionsWithAvailableSpace() {
+ // noop
+ }
+
+ @Override
+ public void updateLags() {
+ // noop
+ }
+
@Override
public void updateInputPartitions(final Set<TopicPartition>
topicPartitions, final Map<String, List<String>>
allTopologyNodesToSourceTopics) {
inputPartitions = topicPartitions;
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 14123c36404..f1ac8249d23 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -596,6 +596,8 @@ public class TopologyTestDriver implements Closeable {
// If the topology only has global tasks, then `task` would be null.
// For this method, it just means there's nothing to do.
if (task != null) {
+ task.resumePollingForPartitionsWithAvailableSpace();
+ task.updateLags();
while (task.hasRecordsQueued() &&
task.isProcessable(mockWallClockTime.milliseconds())) {
// Process the record ...
task.process(mockWallClockTime.milliseconds());