This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 332e698 MINOR: improve log4j messaging (#4530)
332e698 is described below
commit 332e698ac9c74ce29317021b03a54512c92ac8b3
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Feb 5 16:35:12 2018 -0800
MINOR: improve log4j messaging (#4530)
Reviewers: Matthias J. Sax <[email protected]>, James Cheng
<[email protected]>
---
.../org/apache/kafka/streams/errors/TaskMigratedException.java | 10 ++++++++++
.../kafka/streams/processor/internals/PartitionGroup.java | 2 +-
.../kafka/streams/processor/internals/RecordCollectorImpl.java | 2 +-
.../apache/kafka/streams/processor/internals/StreamThread.java | 4 ++--
4 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index f2fa594..5a284e4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException {
private final static long serialVersionUID = 1L;
+ private final Task task;
+
public TaskMigratedException(final Task task) {
this(task, null);
}
@@ -42,11 +44,19 @@ public class TaskMigratedException extends StreamsException
{
pos,
task.toString("> ")),
null);
+
+ this.task = task;
}
public TaskMigratedException(final Task task,
final Throwable throwable) {
super(task.toString(), throwable);
+
+ this.task = task;
+ }
+
+ public Task migratedTask() {
+ return task;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 8ce7dc9..dcaa755 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -153,7 +153,7 @@ public class PartitionGroup {
final RecordQueue recordQueue = partitionQueues.get(partition);
if (recordQueue == null) {
- throw new IllegalStateException("Record's partition does not
belong to this partition-group.");
+ throw new IllegalStateException(String.format("Record's partition
%s does not belong to this partition-group.", partition));
}
return recordQueue.size();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index afdadf2..286cd81 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -121,7 +121,7 @@ public class RecordCollectorImpl implements RecordCollector
{
errorLogMessage += PARAMETER_HINT;
errorMessage += PARAMETER_HINT;
}
- log.error(errorLogMessage, key, value, timestamp, topic, exception);
+ log.error(errorLogMessage, key, value, timestamp, topic,
exception.toString());
sendException = new StreamsException(
String.format(errorMessage,
logPrefix,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb133c6..064a293 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -755,9 +755,9 @@ public class StreamThread extends Thread {
try {
recordsProcessedBeforeCommit =
runOnce(recordsProcessedBeforeCommit);
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
- log.warn("Detected a task that got migrated to another thread.
" +
+ log.warn("Detected task {} that got migrated to another
thread. " +
"This implies that this thread missed a rebalance and
dropped out of the consumer group. " +
- "Trying to rejoin the consumer group now.",
ignoreAndRejoinGroup);
+ "Trying to rejoin the consumer group now. Below is the
detailed description of the task:\n{}",
ignoreAndRejoinGroup.migratedTask().id(),
ignoreAndRejoinGroup.migratedTask().toString(">"));
}
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].