This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new eeafe0a101e MINOR: fix incorrect offset reset logging (#20558)
eeafe0a101e is described below
commit eeafe0a101eca851745508d3a440003fb8108de7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Sep 22 09:54:50 2025 -0700
MINOR: fix incorrect offset reset logging (#20558)
We need to only pass in the reset strategy, as the `logMessage`
parameter was removed.
Reviewers: Chia-Ping Tsai <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../kafka/streams/processor/internals/StreamThread.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ad66a822e72..f7f3676d24f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1524,6 +1524,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
try {
records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
+ log.info("Found no valid offset for {} partitions, resetting.",
e.partitions().size());
resetOffsets(e.partitions(), e);
}
@@ -1598,14 +1599,14 @@ public class StreamThread extends Thread implements
ProcessingThread {
addToResetList(
partition,
seekToBeginning,
- "Setting topic '{}' to consume from earliest
offset",
+ "Setting topic '{}' to consume from 'earliest'
offset",
loggedTopics
);
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
addToResetList(
partition,
seekToEnd,
- "Setting topic '{}' to consume from latest offset",
+ "Setting topic '{}' to consume from 'latest'
offset",
loggedTopics
);
} else if (resetPolicy.type() ==
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
@@ -1613,7 +1614,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
partition,
seekByDuration,
resetPolicy.duration().get(),
- "Setting topic '{}' to consume from
by_duration:{}",
+ "Setting topic '{}' to consume from
'by_duration:{}'",
resetPolicy.duration().get().toString(),
loggedTopics
);
@@ -1729,12 +1730,12 @@ public class StreamThread extends Thread implements
ProcessingThread {
private void addToResetList(
final TopicPartition partition,
final Set<TopicPartition> partitions,
- final String resetPolicy,
+ final String logMessage,
final Set<String> loggedTopics
) {
final String topic = partition.topic();
if (loggedTopics.add(topic)) {
- log.info("Setting topic '{}' to consume from {} offset", topic,
resetPolicy);
+ log.info(logMessage, topic);
}
partitions.add(partition);
}