This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new b4a20e8  KAFKA-7921: log at error level for missing source topic 
(#6262)
b4a20e8 is described below

commit b4a20e8d59d0ba5fd4e0acba95e144b82dde0204
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Wed Feb 13 14:10:10 2019 -0600

    KAFKA-7921: log at error level for missing source topic (#6262)
    
    This condition is a fatal error, so error level is warranted, to provide 
more context on why Streams shuts down.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
 .../kafka/streams/processor/internals/StreamsPartitionAssignor.java     | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e96c110..71df0f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -267,7 +267,7 @@ public class StreamThread extends Thread {
                 taskManager.suspendedStandbyTaskIds());
 
             if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-                log.debug("Received error code {} - shutdown", 
streamThread.assignmentErrorCode.get());
+                log.error("Received error code {} - shutdown", 
streamThread.assignmentErrorCode.get());
                 streamThread.shutdown();
                 streamThread.setStateListener(null);
                 return;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2f649bc..3d94572 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -463,6 +463,8 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
             for (final String topic : topicsInfo.sourceTopics) {
                 if 
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
                     !metadata.topics().contains(topic)) {
+                    log.error("Missing source topic {} durign assignment. 
Returning error {}.",
+                              topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
                     return errorAssignment(clientsMetadata, topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
                 }
             }

Reply via email to