Repository: kafka Updated Branches: refs/heads/trunk 23a014052 -> 2427a4476
MINOR: log4j improvements on assigned tasks and store changelog reader Author: Guozhang Wang <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Xavier Léauté <[email protected]>, Damian Guy <[email protected]>, Bill Bejeck <[email protected]> Closes #4031 from guozhangwang/KMinor-assigned-task-log4j Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2427a447 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2427a447 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2427a447 Branch: refs/heads/trunk Commit: 2427a44768f9734179c957b645df0476e7cb6d05 Parents: 23a0140 Author: Guozhang Wang <[email protected]> Authored: Fri Oct 6 15:42:18 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Oct 6 15:42:18 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/processor/internals/AssignedTasks.java | 8 ++++---- .../streams/processor/internals/StoreChangelogReader.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 12c3f79..6ab807f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -122,7 +122,7 @@ class AssignedTasks implements RestoringTasks { final Map.Entry<TaskId, Task> entry = it.next(); try { if (!entry.getValue().initialize()) { - log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey()); + log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); addToRestoring(entry.getValue()); } else { transitionToRunning(entry.getValue(), readyPartitions); @@ -140,7 +140,7 @@ class AssignedTasks implements RestoringTasks { if (restored.isEmpty()) { return Collections.emptySet(); } - log.trace("{} partitions restored for {}", taskTypeName, restored); + log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored); final Set<TopicPartition> resume = new HashSet<>(); restoredPartitions.addAll(restored); for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) { @@ -153,10 +153,10 @@ class AssignedTasks implements RestoringTasks { if (log.isTraceEnabled()) { final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions()); outstandingPartitions.removeAll(restoredPartitions); - log.trace("partition restoration not complete for {} {} partitions: {}", + log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", taskTypeName, task.id(), - task.changelogPartitions()); + outstandingPartitions); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index cc298e2..bbe570c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -194,7 +194,7 @@ public class StoreChangelogReader implements ChangelogReader { private Collection<TopicPartition> completed() { final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet()); completed.removeAll(needsRestoring.keySet()); - log.debug("completed partitions {}", completed); + log.trace("The set of restoration completed partitions so far: {}", completed); return completed; }
