This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71397ad  MINOR: Log4j Improvements on Fetcher (#8629)
71397ad is described below

commit 71397adaff8fb4e9dd126c51cd38110cbd813936
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu May 7 12:03:43 2020 -0700

    MINOR: Log4j Improvements on Fetcher (#8629)
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java   | 6 ++++--
 .../java/org/apache/kafka/common/record/DefaultRecordBatch.java     | 2 ++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0699684..68c7347 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -676,13 +676,15 @@ public class Fetcher<K, V> implements Closeable {
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
 
+                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
+                        partRecords.size(), position, 
completedFetch.partition);
+
                 if (completedFetch.nextFetchOffset > position.offset) {
                     SubscriptionState.FetchPosition nextPosition = new 
SubscriptionState.FetchPosition(
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Returning fetched records at offset {} for 
assigned partition {} and update " +
-                            "position to {}", position, 
completedFetch.partition, nextPosition);
+                    log.trace("Update fetching position to {} for partition 
{}", nextPosition, completedFetch.partition);
                     subscriptions.position(completedFetch.partition, 
nextPosition);
                 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index d4a9587..b49f2fd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -484,6 +484,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
     @Override
     public String toString() {
         return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + 
", " + lastOffset() + "], " +
+                "sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
+                "isTransactional=" + isTransactional() + ", isControlBatch=" + 
isControlBatch() + ", " +
                 "compression=" + compressionType() + ", timestampType=" + 
timestampType() + ", crc=" + checksum() + ")";
     }
 

Reply via email to