This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 332e698  MINOR: improve log4j messaging (#4530)
332e698 is described below

commit 332e698ac9c74ce29317021b03a54512c92ac8b3
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Feb 5 16:35:12 2018 -0800

    MINOR: improve log4j messaging (#4530)
    
    Reviewers: Matthias J. Sax <[email protected]>, James Cheng 
<[email protected]>
---
 .../org/apache/kafka/streams/errors/TaskMigratedException.java | 10 ++++++++++
 .../kafka/streams/processor/internals/PartitionGroup.java      |  2 +-
 .../kafka/streams/processor/internals/RecordCollectorImpl.java |  2 +-
 .../apache/kafka/streams/processor/internals/StreamThread.java |  4 ++--
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index f2fa594..5a284e4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException {
 
     private final static long serialVersionUID = 1L;
 
+    private final Task task;
+
     public TaskMigratedException(final Task task) {
         this(task, null);
     }
@@ -42,11 +44,19 @@ public class TaskMigratedException extends StreamsException 
{
                             pos,
                             task.toString("> ")),
             null);
+
+        this.task = task;
     }
 
     public TaskMigratedException(final Task task,
                                  final Throwable throwable) {
         super(task.toString(), throwable);
+
+        this.task = task;
+    }
+
+    public Task migratedTask() {
+        return task;
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 8ce7dc9..dcaa755 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -153,7 +153,7 @@ public class PartitionGroup {
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         if (recordQueue == null) {
-            throw new IllegalStateException("Record's partition does not 
belong to this partition-group.");
+            throw new IllegalStateException(String.format("Record's partition 
%s does not belong to this partition-group.", partition));
         }
 
         return recordQueue.size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index afdadf2..286cd81 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -121,7 +121,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
             errorLogMessage += PARAMETER_HINT;
             errorMessage += PARAMETER_HINT;
         }
-        log.error(errorLogMessage, key, value, timestamp, topic, exception);
+        log.error(errorLogMessage, key, value, timestamp, topic, 
exception.toString());
         sendException = new StreamsException(
             String.format(errorMessage,
                           logPrefix,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb133c6..064a293 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -755,9 +755,9 @@ public class StreamThread extends Thread {
             try {
                 recordsProcessedBeforeCommit = 
runOnce(recordsProcessedBeforeCommit);
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
-                log.warn("Detected a task that got migrated to another thread. 
" +
+                log.warn("Detected task {} that got migrated to another 
thread. " +
                     "This implies that this thread missed a rebalance and 
dropped out of the consumer group. " +
-                    "Trying to rejoin the consumer group now.", 
ignoreAndRejoinGroup);
+                    "Trying to rejoin the consumer group now. Below is the 
detailed description of the task:\n{}", 
ignoreAndRejoinGroup.migratedTask().id(), 
ignoreAndRejoinGroup.migratedTask().toString(">"));
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to