This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2162d07  KAFKA-6802: Improved logging for missing topics during task 
assignment (#4891)
2162d07 is described below

commit 2162d0713f9fa20c6b5c7fd151c7f66acd15e7ce
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Apr 18 18:35:10 2018 -0400

    KAFKA-6802: Improved logging for missing topics during task assignment 
(#4891)
    
    If users don't create all topics before starting a streams application, 
they could get unexpected results. For example, sharing a state store between 
sub-topologies where one input topic is not created ahead time results in log 
message that that "Partition X is not assigned to any tasks" does not give any 
clues as to how this could have occurred.
    
    Also, this PR changes the log level from INFO to WARN when metadata does 
not have partitions for a given topic.
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../org/apache/kafka/streams/processor/DefaultPartitionGrouper.java | 6 +++++-
 .../kafka/streams/processor/internals/StreamsPartitionAssignor.java | 6 +++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index c86171c..cee9488 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -82,7 +82,11 @@ public class DefaultPartitionGrouper implements 
PartitionGrouper {
             List<PartitionInfo> partitions = 
metadata.partitionsForTopic(topic);
 
             if (partitions.isEmpty()) {
-                log.info("Skipping assigning topic {} to tasks since its 
metadata is not available yet", topic);
+
+                log.warn("Skipping creating tasks for the topic group {} since 
topic {}'s metadata is not available yet;"
+                         + " no tasks for this topic group will be assigned to 
any client.\n"
+                         + " Make sure all supplied topics in the topology are 
created before starting"
+                         + " as this could lead to unexpected results", 
topics, topic);
                 return StreamsPartitionAssignor.NOT_AVAILABLE;
             } else {
                 int numPartitions = partitions.size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index c81105e..1f00c04 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -470,7 +470,11 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 for (final PartitionInfo partitionInfo : partitionInfoList) {
                     final TopicPartition partition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
-                        log.warn("Partition {} is not assigned to any tasks: 
{}", partition, partitionsForTask);
+                        log.warn("Partition {} is not assigned to any tasks: 
{}"
+                                 + " Possible causes of a partition not 
getting assigned"
+                                 + " is that another topic defined in the 
topology has not been"
+                                 + " created when starting your streams 
application,"
+                                 + " resulting in no tasks created for this 
topology at all.", partition, partitionsForTask);
                     }
                 }
             } else {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to