This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06c92131356 KAFKA-19682: Add logs for Kafka Streams task readiness
(#20693)
06c92131356 is described below
commit 06c9213135665b26f5aa323f12fb09de1b5b6c00
Author: lucliu1108 <[email protected]>
AuthorDate: Wed Nov 12 13:05:23 2025 -0600
KAFKA-19682: Add logs for Kafka Streams task readiness (#20693)
Add more debug/trace logs following the logic of how TaskExecutor
process task.
Reviewers: Matthias J. Sax <[email protected]>
---
.../internals/AbstractPartitionGroup.java | 29 ++++++-
.../processor/internals/PartitionGroup.java | 65 ++++++++------
.../streams/processor/internals/StreamTask.java | 57 +++++++++++--
.../internals/SynchronizedPartitionGroup.java | 2 +-
.../processor/internals/TaskExecutionMetadata.java | 60 ++++++++++++-
.../processor/internals/PartitionGroupTest.java | 99 +++++++++-------------
.../processor/internals/StreamTaskTest.java | 65 ++++++++++++++
.../internals/SynchronizedPartitionGroupTest.java | 3 +-
8 files changed, 281 insertions(+), 99 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
index 1dadbf496a2..76b5f596f94 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
@@ -25,7 +25,34 @@ import java.util.function.Function;
abstract class AbstractPartitionGroup {
- abstract boolean readyToProcess(long wallClockTime);
+ /**
+ * Result of readyToProcess check, containing both the readiness status
+ * and an optional diagnostic log message.
+ */
+ static final class ReadyToProcessResult {
+ private final boolean ready;
+ private final Optional<String> logMessage;
+
+ ReadyToProcessResult(final boolean ready, final Optional<String>
logMessage) {
+ if (ready && logMessage.isPresent()) {
+ throw new IllegalArgumentException("Invalid
ReadyToProcessResult: 'ready' is true but a log message is present.");
+ } else if (!ready && logMessage.isEmpty()) {
+ throw new IllegalArgumentException("Invalid
ReadyToProcessResult: 'ready' is false but no log message is provided.");
+ }
+ this.ready = ready;
+ this.logMessage = logMessage;
+ }
+
+ boolean isReady() {
+ return ready;
+ }
+
+ Optional<String> getLogMessage() {
+ return logMessage;
+ }
+ }
+
+ abstract ReadyToProcessResult readyToProcess(long wallClockTime);
// creates queues for new partitions, removes old queues, saves cached
records for previously assigned partitions
abstract void updatePartitions(Set<TopicPartition> inputPartitions,
Function<TopicPartition, RecordQueue> recordQueueCreator);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 5fb313ff605..95c5fc29f25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -93,7 +93,7 @@ class PartitionGroup extends AbstractPartitionGroup {
}
@Override
- boolean readyToProcess(final long wallClockTime) {
+ ReadyToProcessResult readyToProcess(final long wallClockTime) {
if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
final Set<TopicPartition> bufferedPartitions = new HashSet<>();
@@ -112,40 +112,40 @@ class PartitionGroup extends AbstractPartitionGroup {
bufferedPartitions,
emptyPartitions);
}
- return true;
+ return new ReadyToProcessResult(true, Optional.empty());
}
final Set<TopicPartition> queued = new HashSet<>();
Map<TopicPartition, Long> enforced = null;
+ final StringBuilder logMessageBuilder = new StringBuilder();
for (final Map.Entry<TopicPartition, RecordQueue> entry :
partitionQueues.entrySet()) {
final TopicPartition partition = entry.getKey();
final RecordQueue queue = entry.getValue();
-
if (!queue.isEmpty()) {
// this partition is ready for processing
+ logger.trace("Partition {} has buffered data, ready for
processing", partition);
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else {
final Long fetchedLag = fetchedLags.getOrDefault(partition,
-1L);
-
- logger.trace("Fetched lag for {} is {}", partition,
fetchedLag);
-
+ appendLog(logMessageBuilder, String.format("Partition %s has
fetched lag of %d", partition, fetchedLag));
+
if (fetchedLag == -1L) {
// must wait to fetch metadata for the partition
idlePartitionDeadlines.remove(partition);
- logger.trace("Waiting to fetch data for {}", partition);
- return false;
+ appendLog(logMessageBuilder, String.format("\tWaiting to
fetch data for %s", partition));
+
+ return new ReadyToProcessResult(false,
Optional.of(logMessageBuilder.toString()));
} else if (fetchedLag > 0L) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
- logger.trace(
- "Lag for {} is currently {}, but no data is
buffered locally. Waiting to buffer some records.",
- partition,
- fetchedLag
- );
- return false;
+ appendLog(logMessageBuilder,
+ String.format("Partition %s has current lag %d, but no
data is buffered locally. Waiting to buffer some records.",
+ partition, fetchedLag));
+
+ return new ReadyToProcessResult(false,
Optional.of(logMessageBuilder.toString()));
} else {
// p is known to have zero lag. wait for maxTaskIdleMs to
see if more data shows up.
// One alternative would be to set the deadline to
nullableMetadata.receivedTimestamp + maxTaskIdleMs
@@ -156,16 +156,15 @@ class PartitionGroup extends AbstractPartitionGroup {
idlePartitionDeadlines.putIfAbsent(partition,
wallClockTime + maxTaskIdleMs);
final long deadline =
idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
- logger.trace(
- "Lag for {} is currently 0 and current time is
{}. Waiting for new data to be produced for configured idle time {} (deadline
is {}).",
- partition,
- wallClockTime,
- maxTaskIdleMs,
- deadline
- );
- return false;
+ appendLog(logMessageBuilder, String.format(
+ "Partition %s has current lag 0 and current time
is %d. " +
+ "Waiting for new data to be produced for
configured idle time %d (deadline is %d).",
+ partition, wallClockTime, maxTaskIdleMs,
deadline));
+
+ return new ReadyToProcessResult(false,
Optional.of(logMessageBuilder.toString()));
} else {
// this partition is ready for processing due to the
task idling deadline passing
+ logger.trace("Partition {} is ready for processing due
to the task idling deadline passing", partition);
if (enforced == null) {
enforced = new HashMap<>();
}
@@ -176,10 +175,10 @@ class PartitionGroup extends AbstractPartitionGroup {
}
if (enforced == null) {
logger.trace("All partitions were buffered locally, so this task
is ready for processing.");
- return true;
+ return new ReadyToProcessResult(true, Optional.empty());
} else if (queued.isEmpty()) {
- logger.trace("No partitions were buffered locally, so this task is
not ready for processing.");
- return false;
+ appendLog(logMessageBuilder, "No partitions were buffered locally,
so this task is not ready for processing.");
+ return new ReadyToProcessResult(false,
Optional.of(logMessageBuilder.toString()));
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.trace("Continuing to process although some partitions are
empty on the broker." +
@@ -192,7 +191,7 @@ class PartitionGroup extends AbstractPartitionGroup {
enforced,
maxTaskIdleMs,
wallClockTime);
- return true;
+ return new ReadyToProcessResult(true, Optional.empty());
}
}
@@ -255,24 +254,30 @@ class PartitionGroup extends AbstractPartitionGroup {
if (record != null) {
totalBuffered -= oldSize - queue.size();
+ logger.trace("Partition {} polling next record:, oldSize={},
newSize={}, totalBuffered={}, recordTimestamp={}",
+ queue.partition(), oldSize, queue.size(), totalBuffered,
record.timestamp);
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
+ logger.trace("Partition {} queue is now empty,
allBuffered=false", queue.partition());
} else {
nonEmptyQueuesByTime.offer(queue);
}
// always update the stream-time to the record's timestamp yet
to be processed if it is larger
if (record.timestamp > streamTime) {
+ final long oldStreamTime = streamTime;
streamTime = record.timestamp;
recordLatenessSensor.record(0, wallClockTime);
+ logger.trace("Partition {} stream time updated from {} to
{}", queue.partition(), oldStreamTime, streamTime);
} else {
recordLatenessSensor.record(streamTime - record.timestamp,
wallClockTime);
}
}
+ } else {
+ logger.trace("Partition pulling nextRecord: no queue available");
}
-
return record;
}
@@ -393,4 +398,10 @@ class PartitionGroup extends AbstractPartitionGroup {
}
}
+ private void appendLog(final StringBuilder sb, final String msg) {
+ if (sb.length() > 0) {
+ sb.append("\n");
+ }
+ sb.append(msg);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 895eac6c321..443ad5b0e9e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -118,7 +118,9 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean hasPendingTxCommit = false;
- private Optional<Long> timeCurrentIdlingStarted;
+ private Optional<Long> timeCurrentIdlingStarted; // time since the task
started idling, empty if not idling
+ private Optional<Long> lastNotReadyLogTime; // last time logged about
not being ready to process
+ private static final long NOT_READY_LOG_INTERVAL_MS = 120_000L; // log
interval of not being ready (2 minutes)
@SuppressWarnings({"rawtypes", "this-escape",
"checkstyle:ParameterNumber"})
public StreamTask(final TaskId id,
@@ -226,6 +228,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
highWatermark.put(topicPartition, -1L);
}
timeCurrentIdlingStarted = Optional.empty();
+ lastNotReadyLogTime = Optional.empty();
processingExceptionHandler = config.processingExceptionHandler;
}
@@ -239,6 +242,11 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
return partitionQueues;
}
+ // For testing only
+ void setLastNotReadyLogTime(final long lastNotReadyLogTime) {
+ this.lastNotReadyLogTime = Optional.of(lastNotReadyLogTime);
+ }
+
@Override
public boolean isActive() {
return true;
@@ -416,6 +424,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw new IllegalStateException("Unknown state " + state() + "
while resuming active task " + id);
}
timeCurrentIdlingStarted = Optional.empty();
+ lastNotReadyLogTime = Optional.empty();
}
public void flush() {
@@ -445,7 +454,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// TODO: this should be removed after we decouple caching
with emitting
flush();
if (!clean) {
- log.debug("Skipped preparing {} task with id {} for
commit since the task is getting closed dirty.", state(), id);
+ log.debug("Skipped preparing {} task for commit since
the task is getting closed dirty.", state());
return null;
}
hasPendingTxCommit = eosEnabled;
@@ -725,25 +734,49 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// a task is only closing / closed when 1) task manager is
closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying
the thread since the consumer
// would soon be updated to not return any records for this task
anymore.
- log.info("Stream task {} is already in {} state, skip processing
it.", id(), state());
+ log.info("Task is already in {} state, skip processing it.",
state());
return false;
}
+ final boolean wasReady = lastNotReadyLogTime.isEmpty();
if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the
commit but not process any records
// thus, the task is not processable, even if there is available
data in the record queue
+ if (wasReady) {
+ // READY -> NOT_READY - start timer
+ lastNotReadyLogTime = Optional.of(wallClockTime);
+ } else {
+ maybeLogNotReady(wallClockTime, "Task is not ready to process:
has pending transaction commit");
+ }
return false;
}
- final boolean readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
- if (!readyToProcess) {
- if (timeCurrentIdlingStarted.isEmpty()) {
+
+ final AbstractPartitionGroup.ReadyToProcessResult readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
+ if (!readyToProcess.isReady()) {
+ if (wasReady) {
+ // READY -> NOT_READY - start the timer
+ lastNotReadyLogTime = Optional.of(wallClockTime);
timeCurrentIdlingStarted = Optional.of(wallClockTime);
+ } else {
+ maybeLogNotReady(wallClockTime,
readyToProcess.getLogMessage().orElseThrow());
}
} else {
+ // Task is ready - clear the timer
+ lastNotReadyLogTime = Optional.empty();
timeCurrentIdlingStarted = Optional.empty();
+ log.trace("Task is ready to process");
+ }
+ return readyToProcess.isReady();
+ }
+
+ private void maybeLogNotReady(final long wallClockTime, final String
logMessage) {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = lastNotReadyLogTime.map(aLong ->
wallClockTime - aLong).orElse(-1L);
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ log.info(logMessage);
+ lastNotReadyLogTime = Optional.of(wallClockTime);
}
- return readyToProcess;
}
/**
@@ -764,7 +797,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// if there is no record to process, return immediately
if (record == null) {
+ log.trace("Task has no next record to process.");
return false;
+ } else {
+ log.trace("Task fetched one record {} to process.", record);
}
}
@@ -773,15 +809,22 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
if (!(record instanceof CorruptedRecord)) {
doProcess(wallClockTime);
+ } else {
+ log.trace("Task skips processing corrupted record {}", record);
}
// update the consumed offset map after processing is done
consumedOffsets.put(partition, record.offset());
commitNeeded = true;
+ log.trace("Task processed record: topic={}, partition={},
offset={}",
+ record.topic(), record.partition(), record.offset());
+
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() <= maxBufferedSize) {
+ log.trace("Resume consumption for partition {}: buffered size
{} is under the threshold {}",
+ partition, recordInfo.queue().size(), maxBufferedSize);
partitionsToResume.add(partition);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
index cee6442b663..86ef00bc88e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
@@ -32,7 +32,7 @@ class SynchronizedPartitionGroup extends
AbstractPartitionGroup {
}
@Override
- synchronized boolean readyToProcess(final long wallClockTime) {
+ synchronized ReadyToProcessResult readyToProcess(final long wallClockTime)
{
return wrapped.readyToProcess(wallClockTime);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
index 86001ba4130..0e2f5244649 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -37,6 +37,7 @@ import static
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
public class TaskExecutionMetadata {
// TODO: implement exponential backoff, for now we just wait 5s
private static final long CONSTANT_BACKOFF_MS = 5_000L;
+ private static final long NOT_READY_LOG_INTERVAL_MS = 120_000L; // Log
interval of not being ready (2 minutes)
private final boolean hasNamedTopologies;
private final Set<String> pausedTopologies;
@@ -44,6 +45,9 @@ public class TaskExecutionMetadata {
private final Collection<Task> successfullyProcessed = new HashSet<>();
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata>
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+ // map of task to last not ready logging time (not present means ready, >=
0 means not ready and tracks last log time)
+ private final ConcurrentHashMap<TaskId, Long> taskToLastNotReadyLogTime =
new ConcurrentHashMap<>();
+ private final Logger log;
public TaskExecutionMetadata(final Set<String> allTopologyNames,
final Set<String> pausedTopologies,
@@ -51,6 +55,7 @@ public class TaskExecutionMetadata {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 &&
allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
this.processingMode = processingMode;
+ this.log = new LogContext("").logger(TaskExecutionMetadata.class);
}
public boolean hasNamedTopologies() {
@@ -63,17 +68,61 @@ public class TaskExecutionMetadata {
public boolean canProcessTask(final Task task, final long now) {
final String topologyName = task.id().topologyName();
+ final boolean taskWasReady =
!taskToLastNotReadyLogTime.containsKey(task.id());
+ final boolean canProcess;
+ final String logMessage;
+
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies
(needs KIP)
- return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
+ canProcess = !pausedTopologies.contains(UNNAMED_TOPOLOGY);
+ if (canProcess) {
+ logMessage = String.format("Task %s can be processed: topology
is not paused", task.id());
+ } else {
+ logMessage = String.format("Task %s can't be processed:
topology is paused", task.id());
+ }
} else {
if (pausedTopologies.contains(topologyName)) {
- return false;
+ canProcess = false;
+ logMessage = String.format("Task %s can't be processed:
topology '%s' is paused", task.id(), topologyName);
} else {
final NamedTopologyMetadata metadata =
topologyNameToErrorMetadata.get(topologyName);
- return metadata == null || (metadata.canProcess() &&
metadata.canProcessTask(task, now));
+ canProcess = metadata == null || (metadata.canProcess() &&
metadata.canProcessTask(task, now));
+ if (canProcess) {
+ logMessage = String.format("Task %s can be processed for
named topology '%s'", task.id(), topologyName);
+ } else {
+ logMessage = String.format("Task %s can't be processed for
named topology '%s'", task.id(), topologyName);
+ }
}
}
+
+ if (!canProcess) {
+ if (taskWasReady) {
+ // READY -> NOT_READY - start timer
+ taskToLastNotReadyLogTime.put(task.id(), now);
+ } else {
+ // NOT_READY - check if it should log
+ maybeLogNotReady(task.id(), now, logMessage);
+ }
+ } else {
+ // Task is ready - clear the timer
+ taskToLastNotReadyLogTime.remove(task.id());
+ log.trace(logMessage);
+ }
+
+ return canProcess;
+ }
+
+ private void maybeLogNotReady(final TaskId taskId, final long now, final
String logMessage) {
+ final Long lastLogTime = taskToLastNotReadyLogTime.get(taskId);
+ if (lastLogTime == null) {
+ return;
+ }
+
+ final long timeSinceLastLog = now - lastLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ log.info("Task {} is not ready to process: {}", taskId, logMessage
!= null ? logMessage : "Task cannot be processed");
+ taskToLastNotReadyLogTime.put(taskId, now);
+ }
}
public boolean canPunctuateTask(final Task task) {
@@ -109,6 +158,11 @@ public class TaskExecutionMetadata {
void clearSuccessfullyProcessed() {
successfullyProcessed.clear();
}
+
+ void removeTaskFromNotReadyTracking(final Task task) {
+ final TaskId taskId = task.id();
+ taskToLastNotReadyLogTime.remove(taskId);
+ }
private class NamedTopologyMetadata {
private final Logger log;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 3276a0894b3..9b8d0fe2f06 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -60,7 +60,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -710,7 +709,9 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(0L), is(true));
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertTrue(result.isReady());
+ assertTrue(result.getLogMessage().isEmpty());
assertThat(
appender.getEvents(),
hasItem(Matchers.allOf(
@@ -751,9 +752,12 @@ public class PartitionGroupTest {
group.addRawRecords(partition2, list2);
assertThat(group.allPartitionsBufferedLocally(), is(true));
+
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(0L), is(true));
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertTrue(result.isReady());
+ assertTrue(result.getLogMessage().isEmpty());
assertThat(
appender.getEvents(),
hasItem(Matchers.allOf(
@@ -785,20 +789,13 @@ public class PartitionGroupTest {
group.addRawRecords(partition1, list1);
assertThat(group.allPartitionsBufferedLocally(), is(false));
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(0L), is(false));
- assertThat(
- appender.getEvents(),
- hasItem(Matchers.allOf(
- Matchers.hasProperty("level", equalTo("TRACE")),
- Matchers.hasProperty("message", equalTo("[test] Waiting to
fetch data for topic-2"))
- ))
- );
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertFalse(result.isReady());
+ assertTrue(result.getLogMessage().isPresent() &&
+ result.getLogMessage().get().contains("Waiting to fetch data for
topic-2"));
lags.put(partition2, OptionalLong.of(0L));
group.updateLags();
- assertThat(group.readyToProcess(0L), is(true));
+ assertTrue(group.readyToProcess(0L).isReady());
}
@Test
@@ -826,17 +823,10 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(0L), is(false));
- assertThat(
- appender.getEvents(),
- hasItem(Matchers.allOf(
- Matchers.hasProperty("level", equalTo("TRACE")),
- Matchers.hasProperty("message", equalTo("[test] Lag for
topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some
records."))
- ))
- );
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertFalse(result.isReady());
+ assertTrue(result.getLogMessage().isPresent() &&
+ result.getLogMessage().get().contains("Partition topic-2 has
current lag 1, but no data is buffered locally. Waiting to buffer some
records."));
}
@Test
@@ -861,21 +851,16 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(0L), is(false));
- assertThat(
- appender.getEvents(),
- hasItem(Matchers.allOf(
- Matchers.hasProperty("level", equalTo("TRACE")),
- Matchers.hasProperty("message", equalTo("[test] Lag for
topic-2 is currently 0 and current time is 0. Waiting for new data to be
produced for configured idle time 1 (deadline is 1)."))
- ))
- );
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result1 =
group.readyToProcess(0L);
+ assertFalse(result1.isReady());
+ assertTrue(result1.getLogMessage().isPresent() &&
+ result1.getLogMessage().get().contains("Partition topic-2 has
current lag 0 and current time is 0. Waiting for new data to be produced for
configured idle time 1 (deadline is 1)."));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(1L), is(true));
+ final AbstractPartitionGroup.ReadyToProcessResult result2 =
group.readyToProcess(1L);
+ assertTrue(result2.isReady());
+ assertTrue(result2.getLogMessage().isEmpty());
assertThat(
appender.getEvents(),
hasItem(Matchers.allOf(
@@ -894,7 +879,9 @@ public class PartitionGroupTest {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertThat(group.readyToProcess(2L), is(true));
+ final AbstractPartitionGroup.ReadyToProcessResult result3 =
group.readyToProcess(2L);
+ assertTrue(result3.isReady());
+ assertTrue(result3.getLogMessage().isEmpty());
assertThat(
appender.getEvents(),
hasItem(Matchers.allOf(
@@ -913,33 +900,27 @@ public class PartitionGroupTest {
}
private void hasNoFetchedLag(final PartitionGroup group, final
TopicPartition partition) {
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertFalse(group.readyToProcess(0L));
- assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
- equalTo(String.format("[test] Waiting to fetch data for %s",
partition)))));
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertFalse(result.isReady());
+ assertTrue(result.getLogMessage().isPresent() &&
+ result.getLogMessage().get().contains(String.format("Waiting to
fetch data for %s", partition)));
}
private void hasZeroFetchedLag(final PartitionGroup group, final
TopicPartition partition) {
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertFalse(group.readyToProcess(0L));
- assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
- startsWith(String.format("[test] Lag for %s is currently 0 and
current time is %d. "
- + "Waiting for new data to be produced for configured idle
time", partition, 0L)))));
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertFalse(result.isReady());
+ assertTrue(result.getLogMessage().isPresent() &&
+ result.getLogMessage().get().contains(String.format("Partition %s
has current lag 0 and current time is %d. "
+ + "Waiting for new data to be produced for configured idle time",
partition, 0L)));
}
@SuppressWarnings("SameParameterValue")
private void hasNonZeroFetchedLag(final PartitionGroup group, final
TopicPartition partition, final long lag) {
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- appender.setClassLogger(PartitionGroup.class, Level.TRACE);
- assertFalse(group.readyToProcess(0L));
- assertThat(appender.getEvents(),
hasItem(Matchers.hasProperty("message",
- equalTo(String.format("[test] Lag for %s is currently %d, but
no data is buffered locally. "
- + "Waiting to buffer some records.", partition, lag)))));
- }
+ final AbstractPartitionGroup.ReadyToProcessResult result =
group.readyToProcess(0L);
+ assertFalse(result.isReady());
+ assertTrue(result.getLogMessage().isPresent() &&
+ result.getLogMessage().get().contains(String.format("Partition %s
has current lag %d, but no data is buffered locally. "
+ + "Waiting to buffer some records.", partition, lag)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 71fd00c77ce..a142ebb9d95 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
@@ -77,6 +78,8 @@ import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -121,10 +124,12 @@ import static
org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
import static
org.apache.kafka.streams.processor.internals.Task.State.SUSPENDED;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static
org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.not;
@@ -1599,6 +1604,66 @@ public class StreamTaskTest {
assertThat("task is not idling",
task.timeCurrentIdlingStarted().isEmpty());
}
+ @Test
+ public void shouldLogNotReadyWhenStaleAfterThreshold() throws Exception {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+
+ task = createStatelessTask(createConfig("100"));
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ task.addRecords(partition1,
singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+
+ try (final LogCaptureAppender streamTaskAppender =
LogCaptureAppender.createAndRegister(StreamTask.class);
+ final LogCaptureAppender partitionGroupAppender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+
+ // Enable TRACE logging for PartitionGroup to capture the "ready
for processing" message
+ partitionGroupAppender.setClassLogger(PartitionGroup.class,
Level.TRACE);
+
+ // Set lastNotReadyLogTime to 100 seconds ago
+ final long initialTime = time.milliseconds();
+ task.setLastNotReadyLogTime(initialTime - 100_000L);
+
+ // Advance time by 19.999 seconds
+ long newTime = time.milliseconds() + 19_999L;
+
+ // Should not trigger logging after being stale for 119 seconds
+ assertFalse(task.isProcessable(newTime));
+ List<String> messages = streamTaskAppender.getMessages();
+ assertEquals(0, messages.size(), "No log message should be logged
before 120 seconds");
+
+ // Advance time by 20 seconds
+ newTime = time.milliseconds() + 20_000L;
+
+ // Should trigger logging after being stale for 120 seconds
+ assertFalse(task.isProcessable(newTime));
+
+ // Validate INFO log from StreamTask about partition2 not being
ready
+ messages = streamTaskAppender.getMessages();
+ final String expectedNotReadyMessage = "stream-thread [Test
worker] task [0_0] Partition topic2-0 has fetched lag of -1\n\tWaiting to fetch
data for topic2-0";
+ final String expectedReadyMessage = "Partition topic1-0 has
buffered data, ready for processing";
+ assertThat("Should have logged not ready message",
messages.size(), is(1));
+ assertThat(
+ streamTaskAppender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("INFO")),
+ Matchers.hasProperty("message",
equalTo(expectedNotReadyMessage))
+ ))
+ );
+ assertThat(messages.get(0), equalTo(expectedNotReadyMessage));
+
+ // Validate TRACE log from PartitionGroup about partition1 being
ready
+ assertThat(
+ partitionGroupAppender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message",
containsString(expectedReadyMessage))
+ ))
+ );
+ }
+ }
+
@Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
when(stateManager.taskId()).thenReturn(taskId);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
index f11d9bec9fe..fd869db220b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
@@ -27,6 +27,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
@@ -59,7 +60,7 @@ public class SynchronizedPartitionGroupTest {
@Test
public void testReadyToProcess() {
final long wallClockTime = 0L;
- when(wrapped.readyToProcess(wallClockTime)).thenReturn(true);
+ when(wrapped.readyToProcess(wallClockTime)).thenReturn(new
AbstractPartitionGroup.ReadyToProcessResult(true, Optional.empty()));
synchronizedPartitionGroup.readyToProcess(wallClockTime);