This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new eeafe0a101e MINOR: fix incorrect offset reset logging (#20558)
eeafe0a101e is described below

commit eeafe0a101eca851745508d3a440003fb8108de7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Sep 22 09:54:50 2025 -0700

    MINOR: fix incorrect offset reset logging (#20558)
    
    We need to only pass in the reset strategy, as the `logMessage`
    parameter was removed.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../kafka/streams/processor/internals/StreamThread.java       | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ad66a822e72..f7f3676d24f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1524,6 +1524,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         try {
             records = mainConsumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
+            log.info("Found no valid offset for {} partitions, resetting.", 
e.partitions().size());
             resetOffsets(e.partitions(), e);
         }
 
@@ -1598,14 +1599,14 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                         addToResetList(
                             partition,
                             seekToBeginning,
-                            "Setting topic '{}' to consume from earliest 
offset",
+                            "Setting topic '{}' to consume from 'earliest' 
offset",
                             loggedTopics
                         );
                     } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
                         addToResetList(
                             partition,
                             seekToEnd,
-                            "Setting topic '{}' to consume from latest offset",
+                            "Setting topic '{}' to consume from 'latest' 
offset",
                             loggedTopics
                         );
                     } else if (resetPolicy.type() == 
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
@@ -1613,7 +1614,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                             partition,
                             seekByDuration,
                             resetPolicy.duration().get(),
-                            "Setting topic '{}' to consume from 
by_duration:{}",
+                            "Setting topic '{}' to consume from 
'by_duration:{}'",
                             resetPolicy.duration().get().toString(),
                             loggedTopics
                         );
@@ -1729,12 +1730,12 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private void addToResetList(
         final TopicPartition partition,
         final Set<TopicPartition> partitions,
-        final String resetPolicy,
+        final String logMessage,
         final Set<String> loggedTopics
     ) {
         final String topic = partition.topic();
         if (loggedTopics.add(topic)) {
-            log.info("Setting topic '{}' to consume from {} offset", topic, 
resetPolicy);
+            log.info(logMessage, topic);
         }
         partitions.add(partition);
     }

Reply via email to