This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06c92131356 KAFKA-19682: Add logs for Kafka Streams task readiness 
(#20693)
06c92131356 is described below

commit 06c9213135665b26f5aa323f12fb09de1b5b6c00
Author: lucliu1108 <[email protected]>
AuthorDate: Wed Nov 12 13:05:23 2025 -0600

    KAFKA-19682: Add logs for Kafka Streams task readiness (#20693)
    
    Add more debug/trace logs following the logic of how TaskExecutor
    process task.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../internals/AbstractPartitionGroup.java          | 29 ++++++-
 .../processor/internals/PartitionGroup.java        | 65 ++++++++------
 .../streams/processor/internals/StreamTask.java    | 57 +++++++++++--
 .../internals/SynchronizedPartitionGroup.java      |  2 +-
 .../processor/internals/TaskExecutionMetadata.java | 60 ++++++++++++-
 .../processor/internals/PartitionGroupTest.java    | 99 +++++++++-------------
 .../processor/internals/StreamTaskTest.java        | 65 ++++++++++++++
 .../internals/SynchronizedPartitionGroupTest.java  |  3 +-
 8 files changed, 281 insertions(+), 99 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
index 1dadbf496a2..76b5f596f94 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java
@@ -25,7 +25,34 @@ import java.util.function.Function;
 
 abstract class AbstractPartitionGroup {
 
-    abstract boolean readyToProcess(long wallClockTime);
+    /**
+     * Result of readyToProcess check, containing both the readiness status
+     * and an optional diagnostic log message.
+     */
+    static final class ReadyToProcessResult {
+        private final boolean ready;
+        private final Optional<String> logMessage;
+
+        ReadyToProcessResult(final boolean ready, final Optional<String> 
logMessage) {
+            if (ready && logMessage.isPresent()) {
+                throw new IllegalArgumentException("Invalid 
ReadyToProcessResult: 'ready' is true but a log message is present.");
+            } else if (!ready && logMessage.isEmpty()) {
+                throw new IllegalArgumentException("Invalid 
ReadyToProcessResult: 'ready' is false but no log message is provided.");
+            }
+            this.ready = ready;
+            this.logMessage = logMessage;
+        }
+
+        boolean isReady() {
+            return ready;
+        }
+
+        Optional<String> getLogMessage() {
+            return logMessage;
+        }
+    }
+
+    abstract ReadyToProcessResult readyToProcess(long wallClockTime);
 
     // creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
     abstract void updatePartitions(Set<TopicPartition> inputPartitions, 
Function<TopicPartition, RecordQueue> recordQueueCreator);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 5fb313ff605..95c5fc29f25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -93,7 +93,7 @@ class PartitionGroup extends AbstractPartitionGroup {
     }
 
     @Override
-    boolean readyToProcess(final long wallClockTime) {
+    ReadyToProcessResult readyToProcess(final long wallClockTime) {
         if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
             if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
                 final Set<TopicPartition> bufferedPartitions = new HashSet<>();
@@ -112,40 +112,40 @@ class PartitionGroup extends AbstractPartitionGroup {
                         bufferedPartitions,
                         emptyPartitions);
             }
-            return true;
+            return new ReadyToProcessResult(true, Optional.empty());
         }
 
         final Set<TopicPartition> queued = new HashSet<>();
         Map<TopicPartition, Long> enforced = null;
+        final StringBuilder logMessageBuilder = new StringBuilder();
 
         for (final Map.Entry<TopicPartition, RecordQueue> entry : 
partitionQueues.entrySet()) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-
             if (!queue.isEmpty()) {
                 // this partition is ready for processing
+                logger.trace("Partition {} has buffered data, ready for 
processing", partition);
                 idlePartitionDeadlines.remove(partition);
                 queued.add(partition);
             } else {
                 final Long fetchedLag = fetchedLags.getOrDefault(partition, 
-1L);
-
-                logger.trace("Fetched lag for {} is {}", partition, 
fetchedLag);
-
+                appendLog(logMessageBuilder, String.format("Partition %s has 
fetched lag of %d", partition, fetchedLag));
+                
                 if (fetchedLag == -1L) {
                     // must wait to fetch metadata for the partition
                     idlePartitionDeadlines.remove(partition);
-                    logger.trace("Waiting to fetch data for {}", partition);
-                    return false;
+                    appendLog(logMessageBuilder, String.format("\tWaiting to 
fetch data for %s", partition));
+
+                    return new ReadyToProcessResult(false, 
Optional.of(logMessageBuilder.toString()));
                 } else if (fetchedLag > 0L) {
                     // must wait to poll the data we know to be on the broker
                     idlePartitionDeadlines.remove(partition);
-                    logger.trace(
-                            "Lag for {} is currently {}, but no data is 
buffered locally. Waiting to buffer some records.",
-                            partition,
-                            fetchedLag
-                    );
-                    return false;
+                    appendLog(logMessageBuilder,
+                        String.format("Partition %s has current lag %d, but no 
data is buffered locally. Waiting to buffer some records.",
+                        partition, fetchedLag));
+
+                    return new ReadyToProcessResult(false, 
Optional.of(logMessageBuilder.toString()));
                 } else {
                     // p is known to have zero lag. wait for maxTaskIdleMs to 
see if more data shows up.
                     // One alternative would be to set the deadline to 
nullableMetadata.receivedTimestamp + maxTaskIdleMs
@@ -156,16 +156,15 @@ class PartitionGroup extends AbstractPartitionGroup {
                     idlePartitionDeadlines.putIfAbsent(partition, 
wallClockTime + maxTaskIdleMs);
                     final long deadline = 
idlePartitionDeadlines.get(partition);
                     if (wallClockTime < deadline) {
-                        logger.trace(
-                                "Lag for {} is currently 0 and current time is 
{}. Waiting for new data to be produced for configured idle time {} (deadline 
is {}).",
-                                partition,
-                                wallClockTime,
-                                maxTaskIdleMs,
-                                deadline
-                        );
-                        return false;
+                        appendLog(logMessageBuilder, String.format(
+                            "Partition %s has current lag 0 and current time 
is %d. " +
+                                "Waiting for new data to be produced for 
configured idle time %d (deadline is %d).",
+                            partition, wallClockTime, maxTaskIdleMs, 
deadline));
+
+                        return new ReadyToProcessResult(false, 
Optional.of(logMessageBuilder.toString()));
                     } else {
                         // this partition is ready for processing due to the 
task idling deadline passing
+                        logger.trace("Partition {} is ready for processing due 
to the task idling deadline passing", partition);
                         if (enforced == null) {
                             enforced = new HashMap<>();
                         }
@@ -176,10 +175,10 @@ class PartitionGroup extends AbstractPartitionGroup {
         }
         if (enforced == null) {
             logger.trace("All partitions were buffered locally, so this task 
is ready for processing.");
-            return true;
+            return new ReadyToProcessResult(true, Optional.empty());
         } else if (queued.isEmpty()) {
-            logger.trace("No partitions were buffered locally, so this task is 
not ready for processing.");
-            return false;
+            appendLog(logMessageBuilder, "No partitions were buffered locally, 
so this task is not ready for processing.");
+            return new ReadyToProcessResult(false, 
Optional.of(logMessageBuilder.toString()));
         } else {
             enforcedProcessingSensor.record(1.0d, wallClockTime);
             logger.trace("Continuing to process although some partitions are 
empty on the broker." +
@@ -192,7 +191,7 @@ class PartitionGroup extends AbstractPartitionGroup {
                     enforced,
                     maxTaskIdleMs,
                     wallClockTime);
-            return true;
+            return new ReadyToProcessResult(true, Optional.empty());
         }
     }
 
@@ -255,24 +254,30 @@ class PartitionGroup extends AbstractPartitionGroup {
 
             if (record != null) {
                 totalBuffered -= oldSize - queue.size();
+                logger.trace("Partition {} polling next record:, oldSize={}, 
newSize={}, totalBuffered={}, recordTimestamp={}",
+                    queue.partition(), oldSize, queue.size(), totalBuffered, 
record.timestamp);
 
                 if (queue.isEmpty()) {
                     // if a certain queue has been drained, reset the flag
                     allBuffered = false;
+                    logger.trace("Partition {} queue is now empty, 
allBuffered=false", queue.partition());
                 } else {
                     nonEmptyQueuesByTime.offer(queue);
                 }
 
                 // always update the stream-time to the record's timestamp yet 
to be processed if it is larger
                 if (record.timestamp > streamTime) {
+                    final long oldStreamTime = streamTime;
                     streamTime = record.timestamp;
                     recordLatenessSensor.record(0, wallClockTime);
+                    logger.trace("Partition {} stream time updated from {} to 
{}", queue.partition(), oldStreamTime, streamTime);
                 } else {
                     recordLatenessSensor.record(streamTime - record.timestamp, 
wallClockTime);
                 }
             }
+        } else {
+            logger.trace("Partition pulling nextRecord: no queue available");
         }
-
         return record;
     }
 
@@ -393,4 +398,10 @@ class PartitionGroup extends AbstractPartitionGroup {
         }
     }
 
+    private void appendLog(final StringBuilder sb, final String msg) {
+        if (sb.length() > 0) {
+            sb.append("\n");
+        }
+        sb.append(msg);
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 895eac6c321..443ad5b0e9e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -118,7 +118,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
     private boolean hasPendingTxCommit = false;
-    private Optional<Long> timeCurrentIdlingStarted;
+    private Optional<Long> timeCurrentIdlingStarted; // time since the task 
started idling, empty if not idling
+    private Optional<Long> lastNotReadyLogTime;   // last time logged about 
not being ready to process
+    private static final long NOT_READY_LOG_INTERVAL_MS = 120_000L; // log 
interval of not being ready (2 minutes)
 
     @SuppressWarnings({"rawtypes", "this-escape", 
"checkstyle:ParameterNumber"})
     public StreamTask(final TaskId id,
@@ -226,6 +228,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             highWatermark.put(topicPartition, -1L);
         }
         timeCurrentIdlingStarted = Optional.empty();
+        lastNotReadyLogTime = Optional.empty();
         processingExceptionHandler = config.processingExceptionHandler;
     }
 
@@ -239,6 +242,11 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         return partitionQueues;
     }
 
+    // For testing only
+    void setLastNotReadyLogTime(final long lastNotReadyLogTime) {
+        this.lastNotReadyLogTime = Optional.of(lastNotReadyLogTime);
+    }
+
     @Override
     public boolean isActive() {
         return true;
@@ -416,6 +424,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " 
while resuming active task " + id);
         }
         timeCurrentIdlingStarted = Optional.empty();
+        lastNotReadyLogTime = Optional.empty();
     }
 
     public void flush() {
@@ -445,7 +454,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     // TODO: this should be removed after we decouple caching 
with emitting
                     flush();
                     if (!clean) {
-                        log.debug("Skipped preparing {} task with id {} for 
commit since the task is getting closed dirty.", state(), id);
+                        log.debug("Skipped preparing {} task for commit since 
the task is getting closed dirty.", state());
                         return null;
                     }
                     hasPendingTxCommit = eosEnabled;
@@ -725,25 +734,49 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime.isEmpty();
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, "Task is not ready to process: 
has pending transaction commit");
+            }
             return false;
         }
-        final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
-        if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+
+        final AbstractPartitionGroup.ReadyToProcessResult readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+        if (!readyToProcess.isReady()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, 
readyToProcess.getLogMessage().orElseThrow());
             }
         } else {
+            // Task is ready - clear the timer
+            lastNotReadyLogTime = Optional.empty();
             timeCurrentIdlingStarted = Optional.empty();
+            log.trace("Task is ready to process");
+        }
+        return readyToProcess.isReady();
+    }
+
+    private void maybeLogNotReady(final long wallClockTime, final String 
logMessage) {
+        // NOT_READY - check if it should log
+        final long timeSinceLastLog = lastNotReadyLogTime.map(aLong -> 
wallClockTime - aLong).orElse(-1L);
+        if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+            log.info(logMessage);
+            lastNotReadyLogTime = Optional.of(wallClockTime);
         }
-        return readyToProcess;
     }
 
     /**
@@ -764,7 +797,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
             // if there is no record to process, return immediately
             if (record == null) {
+                log.trace("Task has no next record to process.");
                 return false;
+            } else {
+                log.trace("Task fetched one record {} to process.", record);
             }
         }
 
@@ -773,15 +809,22 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
             if (!(record instanceof CorruptedRecord)) {
                 doProcess(wallClockTime);
+            } else {
+                log.trace("Task skips processing corrupted record {}", record);
             }
 
             // update the consumed offset map after processing is done
             consumedOffsets.put(partition, record.offset());
             commitNeeded = true;
 
+            log.trace("Task processed record: topic={}, partition={}, 
offset={}",
+                record.topic(), record.partition(), record.offset());
+
             // after processing this record, if its partition queue's buffered 
size has been
             // decreased to the threshold, we can then resume the consumption 
on this partition
             if (recordInfo.queue().size() <= maxBufferedSize) {
+                log.trace("Resume consumption for partition {}: buffered size 
{} is under the threshold {}",
+                        partition, recordInfo.queue().size(), maxBufferedSize);
                 partitionsToResume.add(partition);
             }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
index cee6442b663..86ef00bc88e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java
@@ -32,7 +32,7 @@ class SynchronizedPartitionGroup extends 
AbstractPartitionGroup {
     }
 
     @Override
-    synchronized boolean readyToProcess(final long wallClockTime) {
+    synchronized ReadyToProcessResult readyToProcess(final long wallClockTime) 
{
         return wrapped.readyToProcess(wallClockTime);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
index 86001ba4130..0e2f5244649 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -37,6 +37,7 @@ import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
 public class TaskExecutionMetadata {
     // TODO: implement exponential backoff, for now we just wait 5s
     private static final long CONSTANT_BACKOFF_MS = 5_000L;
+    private static final long NOT_READY_LOG_INTERVAL_MS = 120_000L; // Log 
interval of not being ready (2 minutes)
 
     private final boolean hasNamedTopologies;
     private final Set<String> pausedTopologies;
@@ -44,6 +45,9 @@ public class TaskExecutionMetadata {
     private final Collection<Task> successfullyProcessed = new HashSet<>();
     // map of topologies experiencing errors/currently under backoff
     private final ConcurrentHashMap<String, NamedTopologyMetadata> 
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+    // map of task to last not ready logging time (not present means ready, >= 
0 means not ready and tracks last log time)
+    private final ConcurrentHashMap<TaskId, Long> taskToLastNotReadyLogTime = 
new ConcurrentHashMap<>();
+    private final Logger log;
 
     public TaskExecutionMetadata(final Set<String> allTopologyNames,
                                  final Set<String> pausedTopologies,
@@ -51,6 +55,7 @@ public class TaskExecutionMetadata {
         this.hasNamedTopologies = !(allTopologyNames.size() == 1 && 
allTopologyNames.contains(UNNAMED_TOPOLOGY));
         this.pausedTopologies = pausedTopologies;
         this.processingMode = processingMode;
+        this.log = new LogContext("").logger(TaskExecutionMetadata.class);
     }
 
     public boolean hasNamedTopologies() {
@@ -63,17 +68,61 @@ public class TaskExecutionMetadata {
 
     public boolean canProcessTask(final Task task, final long now) {
         final String topologyName = task.id().topologyName();
+        final boolean taskWasReady = 
!taskToLastNotReadyLogTime.containsKey(task.id());
+        final boolean canProcess;
+        final String logMessage;
+        
         if (!hasNamedTopologies) {
             // TODO implement error handling/backoff for non-named topologies 
(needs KIP)
-            return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
+            canProcess = !pausedTopologies.contains(UNNAMED_TOPOLOGY);
+            if (canProcess) {
+                logMessage = String.format("Task %s can be processed: topology 
is not paused", task.id());
+            } else {
+                logMessage = String.format("Task %s can't be processed: 
topology is paused", task.id());
+            }
         } else {
             if (pausedTopologies.contains(topologyName)) {
-                return false;
+                canProcess = false;
+                logMessage = String.format("Task %s can't be processed: 
topology '%s' is paused", task.id(), topologyName);
             } else {
                 final NamedTopologyMetadata metadata = 
topologyNameToErrorMetadata.get(topologyName);
-                return metadata == null || (metadata.canProcess() && 
metadata.canProcessTask(task, now));
+                canProcess = metadata == null || (metadata.canProcess() && 
metadata.canProcessTask(task, now));
+                if (canProcess) {
+                    logMessage = String.format("Task %s can be processed for 
named topology '%s'", task.id(), topologyName);
+                } else {
+                    logMessage = String.format("Task %s can't be processed for 
named topology '%s'", task.id(), topologyName);
+                }
             }
         }
+        
+        if (!canProcess) {
+            if (taskWasReady) {
+                // READY -> NOT_READY - start timer
+                taskToLastNotReadyLogTime.put(task.id(), now);
+            } else {
+                // NOT_READY - check if it should log
+                maybeLogNotReady(task.id(), now, logMessage);
+            }
+        } else {
+            // Task is ready - clear the timer
+            taskToLastNotReadyLogTime.remove(task.id());
+            log.trace(logMessage);
+        }
+        
+        return canProcess;
+    }
+    
+    private void maybeLogNotReady(final TaskId taskId, final long now, final 
String logMessage) {
+        final Long lastLogTime = taskToLastNotReadyLogTime.get(taskId);
+        if (lastLogTime == null) {
+            return;
+        }
+        
+        final long timeSinceLastLog = now - lastLogTime;
+        if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+            log.info("Task {} is not ready to process: {}", taskId, logMessage 
!= null ? logMessage : "Task cannot be processed");
+            taskToLastNotReadyLogTime.put(taskId, now);
+        }
     }
 
     public boolean canPunctuateTask(final Task task) {
@@ -109,6 +158,11 @@ public class TaskExecutionMetadata {
     void clearSuccessfullyProcessed() {
         successfullyProcessed.clear();
     }
+    
+    void removeTaskFromNotReadyTracking(final Task task) {
+        final TaskId taskId = task.id();
+        taskToLastNotReadyLogTime.remove(taskId);
+    }
 
     private class NamedTopologyMetadata {
         private final Logger log;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 3276a0894b3..9b8d0fe2f06 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -60,7 +60,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -710,7 +709,9 @@ public class PartitionGroupTest {
         assertThat(group.allPartitionsBufferedLocally(), is(false));
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
             appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(0L), is(true));
+            final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+            assertTrue(result.isReady());
+            assertTrue(result.getLogMessage().isEmpty());
             assertThat(
                 appender.getEvents(),
                 hasItem(Matchers.allOf(
@@ -751,9 +752,12 @@ public class PartitionGroupTest {
         group.addRawRecords(partition2, list2);
 
         assertThat(group.allPartitionsBufferedLocally(), is(true));
+
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
             appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(0L), is(true));
+            final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+            assertTrue(result.isReady());
+            assertTrue(result.getLogMessage().isEmpty());
             assertThat(
                 appender.getEvents(),
                 hasItem(Matchers.allOf(
@@ -785,20 +789,13 @@ public class PartitionGroupTest {
         group.addRawRecords(partition1, list1);
 
         assertThat(group.allPartitionsBufferedLocally(), is(false));
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(0L), is(false));
-            assertThat(
-                appender.getEvents(),
-                hasItem(Matchers.allOf(
-                    Matchers.hasProperty("level", equalTo("TRACE")),
-                    Matchers.hasProperty("message", equalTo("[test] Waiting to 
fetch data for topic-2"))
-                ))
-            );
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+        assertFalse(result.isReady());
+        assertTrue(result.getLogMessage().isPresent() &&
+            result.getLogMessage().get().contains("Waiting to fetch data for 
topic-2"));
         lags.put(partition2, OptionalLong.of(0L));
         group.updateLags();
-        assertThat(group.readyToProcess(0L), is(true));
+        assertTrue(group.readyToProcess(0L).isReady());
     }
 
     @Test
@@ -826,17 +823,10 @@ public class PartitionGroupTest {
 
         assertThat(group.allPartitionsBufferedLocally(), is(false));
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(0L), is(false));
-            assertThat(
-                appender.getEvents(),
-                hasItem(Matchers.allOf(
-                    Matchers.hasProperty("level", equalTo("TRACE")),
-                    Matchers.hasProperty("message", equalTo("[test] Lag for 
topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some 
records."))
-                ))
-            );
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+        assertFalse(result.isReady());
+        assertTrue(result.getLogMessage().isPresent() &&
+            result.getLogMessage().get().contains("Partition topic-2 has 
current lag 1, but no data is buffered locally. Waiting to buffer some 
records."));
     }
 
     @Test
@@ -861,21 +851,16 @@ public class PartitionGroupTest {
 
         assertThat(group.allPartitionsBufferedLocally(), is(false));
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(0L), is(false));
-            assertThat(
-                appender.getEvents(),
-                hasItem(Matchers.allOf(
-                    Matchers.hasProperty("level", equalTo("TRACE")),
-                    Matchers.hasProperty("message", equalTo("[test] Lag for 
topic-2 is currently 0 and current time is 0. Waiting for new data to be 
produced for configured idle time 1 (deadline is 1)."))
-                ))
-            );
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result1 = 
group.readyToProcess(0L);
+        assertFalse(result1.isReady());
+        assertTrue(result1.getLogMessage().isPresent() &&
+            result1.getLogMessage().get().contains("Partition topic-2 has 
current lag 0 and current time is 0. Waiting for new data to be produced for 
configured idle time 1 (deadline is 1)."));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
             appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(1L), is(true));
+            final AbstractPartitionGroup.ReadyToProcessResult result2 = 
group.readyToProcess(1L);
+            assertTrue(result2.isReady());
+            assertTrue(result2.getLogMessage().isEmpty());
             assertThat(
                 appender.getEvents(),
                 hasItem(Matchers.allOf(
@@ -894,7 +879,9 @@ public class PartitionGroupTest {
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
             appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertThat(group.readyToProcess(2L), is(true));
+            final AbstractPartitionGroup.ReadyToProcessResult result3 = 
group.readyToProcess(2L);
+            assertTrue(result3.isReady());
+            assertTrue(result3.getLogMessage().isEmpty());
             assertThat(
                 appender.getEvents(),
                 hasItem(Matchers.allOf(
@@ -913,33 +900,27 @@ public class PartitionGroupTest {
     }
 
     private void hasNoFetchedLag(final PartitionGroup group, final 
TopicPartition partition) {
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertFalse(group.readyToProcess(0L));
-            assertThat(appender.getEvents(), 
hasItem(Matchers.hasProperty("message",
-                equalTo(String.format("[test] Waiting to fetch data for %s", 
partition)))));
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+        assertFalse(result.isReady());
+        assertTrue(result.getLogMessage().isPresent() &&
+            result.getLogMessage().get().contains(String.format("Waiting to 
fetch data for %s", partition)));
     }
 
     private void hasZeroFetchedLag(final PartitionGroup group, final 
TopicPartition partition) {
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertFalse(group.readyToProcess(0L));
-            assertThat(appender.getEvents(), 
hasItem(Matchers.hasProperty("message",
-                startsWith(String.format("[test] Lag for %s is currently 0 and 
current time is %d. "
-                    + "Waiting for new data to be produced for configured idle 
time", partition, 0L)))));
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+        assertFalse(result.isReady());
+        assertTrue(result.getLogMessage().isPresent() &&
+            result.getLogMessage().get().contains(String.format("Partition %s 
has current lag 0 and current time is %d. "
+            + "Waiting for new data to be produced for configured idle time", 
partition, 0L)));
     }
 
     @SuppressWarnings("SameParameterValue")
     private void hasNonZeroFetchedLag(final PartitionGroup group, final 
TopicPartition partition, final long lag) {
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            appender.setClassLogger(PartitionGroup.class, Level.TRACE);
-            assertFalse(group.readyToProcess(0L));
-            assertThat(appender.getEvents(), 
hasItem(Matchers.hasProperty("message",
-                equalTo(String.format("[test] Lag for %s is currently %d, but 
no data is buffered locally. "
-                    + "Waiting to buffer some records.", partition, lag)))));
-        }
+        final AbstractPartitionGroup.ReadyToProcessResult result = 
group.readyToProcess(0L);
+        assertFalse(result.isReady());
+        assertTrue(result.getLogMessage().isPresent() &&
+            result.getLogMessage().get().contains(String.format("Partition %s 
has current lag %d, but no data is buffered locally. "
+            + "Waiting to buffer some records.", partition, lag)));
     }
 
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 71fd00c77ce..a142ebb9d95 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -77,6 +78,8 @@ import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
+import org.apache.logging.log4j.Level;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -121,10 +124,12 @@ import static 
org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
 import static 
org.apache.kafka.streams.processor.internals.Task.State.SUSPENDED;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
 import static 
org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.not;
@@ -1599,6 +1604,66 @@ public class StreamTaskTest {
         assertThat("task is not idling", 
task.timeCurrentIdlingStarted().isEmpty());
     }
 
+    @Test
+    public void shouldLogNotReadyWhenStaleAfterThreshold() throws Exception {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        
+        task = createStatelessTask(createConfig("100"));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        task.addRecords(partition1, 
singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+
+        try (final LogCaptureAppender streamTaskAppender = 
LogCaptureAppender.createAndRegister(StreamTask.class);
+             final LogCaptureAppender partitionGroupAppender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+            
+            // Enable TRACE logging for PartitionGroup to capture the "ready 
for processing" message
+            partitionGroupAppender.setClassLogger(PartitionGroup.class, 
Level.TRACE);
+            
+            // Set lastNotReadyLogTime to 100 seconds ago
+            final long initialTime = time.milliseconds();
+            task.setLastNotReadyLogTime(initialTime - 100_000L);
+            
+            // Advance time by 19.999 seconds
+            long newTime = time.milliseconds() + 19_999L;
+            
+            // Should not trigger logging after being stale for 119 seconds
+            assertFalse(task.isProcessable(newTime));
+            List<String> messages = streamTaskAppender.getMessages();
+            assertEquals(0, messages.size(), "No log message should be logged 
before 120 seconds");
+
+            // Advance time by 20 seconds
+            newTime = time.milliseconds() + 20_000L;
+
+            // Should trigger logging after being stale for 120 seconds
+            assertFalse(task.isProcessable(newTime));
+            
+            // Validate INFO log from StreamTask about partition2 not being 
ready
+            messages = streamTaskAppender.getMessages();
+            final String expectedNotReadyMessage = "stream-thread [Test 
worker] task [0_0] Partition topic2-0 has fetched lag of -1\n\tWaiting to fetch 
data for topic2-0";
+            final String expectedReadyMessage = "Partition topic1-0 has 
buffered data, ready for processing";
+            assertThat("Should have logged not ready message", 
messages.size(), is(1));
+            assertThat(
+                streamTaskAppender.getEvents(),
+                hasItem(Matchers.allOf(
+                    Matchers.hasProperty("level", equalTo("INFO")),
+                    Matchers.hasProperty("message", 
equalTo(expectedNotReadyMessage))
+                ))
+            );
+            assertThat(messages.get(0), equalTo(expectedNotReadyMessage));
+            
+            // Validate TRACE log from PartitionGroup about partition1 being 
ready
+            assertThat(
+                partitionGroupAppender.getEvents(),
+                hasItem(Matchers.allOf(
+                    Matchers.hasProperty("level", equalTo("TRACE")),
+                    Matchers.hasProperty("message", 
containsString(expectedReadyMessage))
+                ))
+            );
+        }
+    }
+
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         when(stateManager.taskId()).thenReturn(taskId);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
index f11d9bec9fe..fd869db220b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java
@@ -27,6 +27,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 
@@ -59,7 +60,7 @@ public class SynchronizedPartitionGroupTest {
     @Test
     public void testReadyToProcess() {
         final long wallClockTime = 0L;
-        when(wrapped.readyToProcess(wallClockTime)).thenReturn(true);
+        when(wrapped.readyToProcess(wallClockTime)).thenReturn(new 
AbstractPartitionGroup.ReadyToProcessResult(true, Optional.empty()));
 
         synchronizedPartitionGroup.readyToProcess(wallClockTime);
 


Reply via email to