This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 587f56593c6 KAFKA-14809 Fix logging conditional on WorkerSourceTask 
(#13386)
587f56593c6 is described below

commit 587f56593c66eebe18e0c1350593077f7d49c5e7
Author: Hector Geraldino <[email protected]>
AuthorDate: Thu Mar 16 08:39:31 2023 -0400

    KAFKA-14809 Fix logging conditional on WorkerSourceTask (#13386)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../apache/kafka/connect/runtime/WorkerSourceTask.java | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ed366763a36..76d674d0fe7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -480,24 +480,24 @@ class WorkerSourceTask extends WorkerTask {
             this.committableOffsets = CommittableOffsets.EMPTY;
         }
 
-        if (committableOffsets.isEmpty()) {
-            log.info("{} Either no records were produced by the task since the 
last offset commit, " 
-                    + "or every record has been filtered out by a 
transformation " 
+        if (offsetsToCommit.isEmpty()) {
+            log.debug("{} Either no records were produced by the task since 
the last offset commit, " 
+                    + "or every record has been filtered out by a 
transformation "
                     + "or dropped due to transformation or conversion errors.",
                     this
             );
             // We continue with the offset commit process here instead of 
simply returning immediately
             // in order to invoke SourceTask::commit and record metrics for a 
successful offset commit
         } else {
-            log.info("{} Committing offsets for {} acknowledged messages", 
this, committableOffsets.numCommittableMessages());
-            if (committableOffsets.hasPending()) {
+            log.info("{} Committing offsets for {} acknowledged messages", 
this, offsetsToCommit.numCommittableMessages());
+            if (offsetsToCommit.hasPending()) {
                 log.debug("{} There are currently {} pending messages spread 
across {} source partitions whose offsets will not be committed. "
                                 + "The source partition with the most pending 
messages is {}, with {} pending messages",
                         this,
-                        committableOffsets.numUncommittableMessages(),
-                        committableOffsets.numDeques(),
-                        committableOffsets.largestDequePartition(),
-                        committableOffsets.largestDequeSize()
+                        offsetsToCommit.numUncommittableMessages(),
+                        offsetsToCommit.numDeques(),
+                        offsetsToCommit.largestDequePartition(),
+                        offsetsToCommit.largestDequeSize()
                 );
             } else {
                 log.debug("{} There are currently no pending messages for this 
offset commit; " 

Reply via email to