Repository: kafka
Updated Branches:
  refs/heads/trunk 23a014052 -> 2427a4476


MINOR: log4j improvements on assigned tasks and store changelog reader

Author: Guozhang Wang <[email protected]>

Reviewers: Matthias J. Sax <[email protected]>, Xavier Léauté 
<[email protected]>, Damian Guy <[email protected]>, Bill Bejeck 
<[email protected]>

Closes #4031 from guozhangwang/KMinor-assigned-task-log4j


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2427a447
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2427a447
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2427a447

Branch: refs/heads/trunk
Commit: 2427a44768f9734179c957b645df0476e7cb6d05
Parents: 23a0140
Author: Guozhang Wang <[email protected]>
Authored: Fri Oct 6 15:42:18 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri Oct 6 15:42:18 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/AssignedTasks.java     | 8 ++++----
 .../streams/processor/internals/StoreChangelogReader.java    | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 12c3f79..6ab807f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -122,7 +122,7 @@ class AssignedTasks implements RestoringTasks {
             final Map.Entry<TaskId, Task> entry = it.next();
             try {
                 if (!entry.getValue().initialize()) {
-                    log.debug("transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
+                    log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue(), readyPartitions);
@@ -140,7 +140,7 @@ class AssignedTasks implements RestoringTasks {
         if (restored.isEmpty()) {
             return Collections.emptySet();
         }
-        log.trace("{} partitions restored for {}", taskTypeName, restored);
+        log.trace("{} changelog partitions that have completed restoring so 
far: {}", taskTypeName, restored);
         final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, Task>> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
@@ -153,10 +153,10 @@ class AssignedTasks implements RestoringTasks {
                 if (log.isTraceEnabled()) {
                     final HashSet<TopicPartition> outstandingPartitions = new 
HashSet<>(task.changelogPartitions());
                     outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("partition restoration not complete for {} {} 
partitions: {}",
+                    log.trace("{} {} cannot resume processing yet since some 
of its changelog partitions have not completed restoring: {}",
                               taskTypeName,
                               task.id(),
-                              task.changelogPartitions());
+                              outstandingPartitions);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index cc298e2..bbe570c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -194,7 +194,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
     private Collection<TopicPartition> completed() {
         final Set<TopicPartition> completed = new 
HashSet<>(stateRestorers.keySet());
         completed.removeAll(needsRestoring.keySet());
-        log.debug("completed partitions {}", completed);
+        log.trace("The set of restoration completed partitions so far: {}", 
completed);
         return completed;
     }
 

Reply via email to