Repository: kafka
Updated Branches:
  refs/heads/trunk 9662e466d -> e7663a306


MINOR: Improve on Streams log4j

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Dan Norwood, Matthias J. Sax, Jason Gustafson, Eno Thereska

Closes #2026 from guozhangwang/KMinor-improve-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7663a30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7663a30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7663a30

Branch: refs/heads/trunk
Commit: e7663a306f40e9fcbc3096d17fb0f99fa3d11d1d
Parents: 9662e46
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Fri Oct 14 13:54:01 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Fri Oct 14 13:54:01 2016 -0700

----------------------------------------------------------------------
 .../processor/DefaultPartitionGrouper.java      |   6 +-
 .../processor/internals/AbstractTask.java       |   7 +-
 .../internals/ProcessorStateManager.java        |  45 +++---
 .../processor/internals/RecordCollector.java    |  13 +-
 .../processor/internals/RecordQueue.java        |   6 +
 .../processor/internals/StandbyTask.java        |   7 +-
 .../streams/processor/internals/StreamTask.java |  31 ++--
 .../processor/internals/StreamThread.java       | 148 ++++++++++---------
 8 files changed, 152 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 405ecd5..f0fb38c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -75,12 +75,12 @@ public class DefaultPartitionGrouper implements 
PartitionGrouper {
     protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
         int maxNumPartitions = 0;
         for (String topic : topics) {
-            List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+            List<PartitionInfo> partitions = 
metadata.partitionsForTopic(topic);
 
-            if (infos == null)
+            if (partitions == null)
                 throw new StreamsException("Topic not found during partition 
assignment: " + topic);
 
-            int numPartitions = infos.size();
+            int numPartitions = partitions.size();
             if (numPartitions > maxNumPartitions)
                 maxNumPartitions = numPartitions;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7bda3f6..bfdae6b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -69,7 +69,7 @@ public abstract class AbstractTask {
             this.stateMgr = new ProcessorStateManager(applicationId, id, 
partitions, restoreConsumer, isStandby, stateDirectory, 
topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap());
 
         } catch (IOException e) {
-            throw new ProcessorStateException("Error while creating the state 
manager", e);
+            throw new ProcessorStateException(String.format("task [%s] Error 
while creating the state manager", id), e);
         }
     }
 
@@ -108,7 +108,6 @@ public abstract class AbstractTask {
 
     public abstract void commit();
 
-
     public abstract void close();
 
     public abstract void commitOffsets();
@@ -134,11 +133,11 @@ public abstract class AbstractTask {
                 OffsetAndMetadata metadata = consumer.committed(partition); // 
TODO: batch API?
                 stateMgr.putOffsetLimit(partition, metadata != null ? 
metadata.offset() : 0L);
             } catch (AuthorizationException e) {
-                throw new 
ProcessorStateException(String.format("AuthorizationException when initializing 
offsets for %s", partition), e);
+                throw new ProcessorStateException(String.format("task [%s] 
AuthorizationException when initializing offsets for %s", id, partition), e);
             } catch (WakeupException e) {
                 throw e;
             } catch (KafkaException e) {
-                throw new ProcessorStateException(String.format("Failed to 
initialize offsets for %s", partition), e);
+                throw new ProcessorStateException(String.format("task [%s] 
Failed to initialize offsets for %s", id, partition), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 9d2e63f..52a47d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -52,6 +52,7 @@ public class ProcessorStateManager {
     public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
+    private final String logPrefix;
     private final String applicationId;
     private final int defaultPartition;
     private final Map<String, TopicPartition> partitionForTopic;
@@ -94,8 +95,10 @@ public class ProcessorStateManager {
         this.baseDir  = stateDirectory.directoryForTask(taskId);
         this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
 
+        this.logPrefix = String.format("task [%s]", taskId);
+
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new IOException(String.format("task [%s] Failed to lock the 
state directory: %s", taskId, baseDir.getCanonicalPath()));
+            throw new IOException(String.format("%s Failed to lock the state 
directory: %s", logPrefix, baseDir.getCanonicalPath()));
         }
 
         // load the checkpoint information
@@ -121,13 +124,14 @@ public class ProcessorStateManager {
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
     public void register(StateStore store, boolean loggingEnabled, 
StateRestoreCallback stateRestoreCallback) {
+        log.debug("{} Registering state store {} to its state manager", 
logPrefix, store.name());
 
         if (store.name().equals(CHECKPOINT_FILE_NAME)) {
-            throw new IllegalArgumentException(String.format("task [%s] 
Illegal store name: %s", taskId, CHECKPOINT_FILE_NAME));
+            throw new IllegalArgumentException(String.format("%s Illegal store 
name: %s", logPrefix, CHECKPOINT_FILE_NAME));
         }
 
         if (this.stores.containsKey(store.name())) {
-            throw new IllegalArgumentException(String.format("task [%s] Store 
%s has already been registered.", taskId, store.name()));
+            throw new IllegalArgumentException(String.format("%s Store %s has 
already been registered.", logPrefix, store.name()));
         }
 
         if (loggingEnabled) {
@@ -160,16 +164,16 @@ public class ProcessorStateManager {
                 // ignore
             }
 
-            List<PartitionInfo> partitionInfos = null;
+            List<PartitionInfo> partitions;
             try {
-                partitionInfos = restoreConsumer.partitionsFor(topic);
+                partitions = restoreConsumer.partitionsFor(topic);
             } catch (TimeoutException e) {
-                throw new StreamsException(String.format("task [%s] Could not 
find partition info for topic: %s", taskId, topic));
+                throw new StreamsException(String.format("%s Could not find 
partition info for topic: %s", logPrefix, topic));
             }
-            if (partitionInfos == null) {
-                throw new StreamsException(String.format("task [%s] Could not 
find partition info for topic: %s", taskId, topic));
+            if (partitions == null) {
+                throw new StreamsException(String.format("%s Could not find 
partition info for topic: %s", logPrefix, topic));
             }
-            for (PartitionInfo partitionInfo : partitionInfos) {
+            for (PartitionInfo partitionInfo : partitions) {
                 if (partitionInfo.partition() == partition) {
                     partitionNotFound = false;
                     break;
@@ -178,14 +182,19 @@ public class ProcessorStateManager {
         } while (partitionNotFound && System.currentTimeMillis() < startTime + 
waitTime);
 
         if (partitionNotFound) {
-            throw new StreamsException(String.format("task [%s] Store %s's 
change log (%s) does not contain partition %s", taskId, store.name(), topic, 
partition));
+            throw new StreamsException(String.format("%s Store %s's change log 
(%s) does not contain partition %s",
+                    logPrefix, store.name(), topic, partition));
         }
 
         if (isStandby) {
             if (store.persistent()) {
+                log.trace("{} Preparing standby replica of persistent state 
store {} with changelog topic {}", logPrefix, store.name(), topic);
+
                 restoreCallbacks.put(topic, stateRestoreCallback);
             }
         } else {
+            log.trace("{} Restoring state store {} from changelog topic {}", 
logPrefix, store.name(), topic);
+
             restoreActiveState(topic, stateRestoreCallback);
         }
 
@@ -197,7 +206,7 @@ public class ProcessorStateManager {
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("task [%s] Restore 
consumer should have not subscribed to any partitions beforehand", taskId));
+            throw new IllegalStateException(String.format("%s Restore consumer 
should have not subscribed to any partitions beforehand", logPrefix));
         }
         TopicPartition storePartition = new TopicPartition(topicName, 
getPartition(topicName));
         restoreConsumer.assign(Collections.singletonList(storePartition));
@@ -233,7 +242,7 @@ public class ProcessorStateManager {
                 } else if (restoreConsumer.position(storePartition) > 
endOffset) {
                     // For a logging enabled changelog (no offset limit),
                     // the log end offset should not change while restoring 
since it is only written by this thread.
-                    throw new IllegalStateException(String.format("task [%s] 
Log end offset should not change while restoring", taskId));
+                    throw new IllegalStateException(String.format("%s Log end 
offset should not change while restoring", logPrefix));
                 }
             }
 
@@ -278,7 +287,7 @@ public class ProcessorStateManager {
                 try {
                     restoreCallback.restore(record.key(), record.value());
                 } catch (Exception e) {
-                    throw new ProcessorStateException(String.format("task [%s] 
exception caught while trying to restore state from %s", taskId, 
storePartition), e);
+                    throw new ProcessorStateException(String.format("%s 
exception caught while trying to restore state from %s", logPrefix, 
storePartition), e);
                 }
                 lastOffset = record.offset();
             } else {
@@ -310,7 +319,7 @@ public class ProcessorStateManager {
 
     public void flush(final InternalProcessorContext context) {
         if (!this.stores.isEmpty()) {
-            log.debug("task [{}] Flushing stores.", taskId);
+            log.debug("{} Flushing all stores registered in the state 
manager", logPrefix);
             for (StateStore store : this.stores.values()) {
                 final ProcessorNode processorNode = 
stateStoreProcessorNodeMap.get(store);
                 if (processorNode != null) {
@@ -319,7 +328,7 @@ public class ProcessorStateManager {
                 try {
                     store.flush();
                 } catch (Exception e) {
-                    throw new ProcessorStateException(String.format("task [%s] 
Failed to flush state store %s", taskId, store.name()), e);
+                    throw new ProcessorStateException(String.format("%s Failed 
to flush state store %s", logPrefix, store.name()), e);
                 }
             }
         }
@@ -333,13 +342,13 @@ public class ProcessorStateManager {
             // attempting to close the stores, just in case they
             // are not closed by a ProcessorNode yet
             if (!stores.isEmpty()) {
-                log.debug("task [{}] Closing stores.", taskId);
+                log.debug("{} Closing its state manager and all the registered 
state stores", logPrefix);
                 for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                    log.debug("task [{}} Closing storage engine {}", taskId, 
entry.getKey());
+                    log.debug("{} Closing storage engine {}", logPrefix, 
entry.getKey());
                     try {
                         entry.getValue().close();
                     } catch (Exception e) {
-                        throw new ProcessorStateException(String.format("task 
[%s] Failed to close state store %s", taskId, entry.getKey()), e);
+                        throw new ProcessorStateException(String.format("%s 
Failed to close state store %s", logPrefix, entry.getKey()), e);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index cd5ee1c..63d6a3b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -54,12 +54,13 @@ public class RecordCollector {
 
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
-    private String streamTaskId = null;
+    private final String logPrefix;
+
 
     public RecordCollector(Producer<byte[], byte[]> producer, String 
streamTaskId) {
         this.producer = producer;
         this.offsets = new HashMap<>();
-        this.streamTaskId = streamTaskId;
+        this.logPrefix = String.format("task [%s]", streamTaskId);
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) {
@@ -90,17 +91,16 @@ public class RecordCollector {
                             TopicPartition tp = new 
TopicPartition(metadata.topic(), metadata.partition());
                             offsets.put(tp, metadata.offset());
                         } else {
-                            String prefix = String.format("task [%s]", 
streamTaskId);
-                            log.error(String.format("%s Error sending record 
to topic %s", prefix, topic), exception);
+                            log.error("{} Error sending record to topic {}", 
logPrefix, topic, exception);
                         }
                     }
                 });
                 return;
             } catch (TimeoutException e) {
                 if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("task [%s] failed 
to send record to topic %s after %d attempts", streamTaskId, topic, attempt));
+                    throw new StreamsException(String.format("%s Failed to 
send record to topic %s after %d attempts", logPrefix, topic, attempt));
                 }
-                log.warn(String.format("task [%s] timeout exception caught 
when sending record to topic %s attempt %s", streamTaskId, topic, attempt));
+                log.warn("{} Timeout exception caught when sending record to 
topic {} attempt {}", logPrefix, topic, attempt);
                 Utils.sleep(SEND_RETRY_BACKOFF);
             }
 
@@ -108,6 +108,7 @@ public class RecordCollector {
     }
 
     public void flush() {
+        log.debug("{} Flushing producer", logPrefix);
         this.producer.flush();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 156b45d..5199c96 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 
@@ -34,6 +36,8 @@ import static java.lang.String.format;
  */
 public class RecordQueue {
 
+    private static final Logger log = 
LoggerFactory.getLogger(RecordQueue.class);
+
     private final SourceNode source;
     private final TopicPartition partition;
     private final ArrayDeque<StampedRecord> fifoQueue;
@@ -100,6 +104,8 @@ public class RecordQueue {
                                                                          
rawRecord.serializedValueSize(), key, value);
             long timestamp = timestampExtractor.extract(record);
 
+            log.trace("Source node {} extracted timestamp {} for record {} 
when adding to buffered queue", source.name(), timestamp, record);
+
             // validate that timestamp must be non-negative
             if (timestamp < 0)
                 throw new StreamsException("Extracted timestamp value is 
negative, which is not allowed.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index e57b44a..ac8b0ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -61,11 +61,10 @@ public class StandbyTask extends AbstractTask {
                        StreamsMetrics metrics, final StateDirectory 
stateDirectory) {
         super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, true, stateDirectory, null);
 
-        log.info("task [{}] Creating processorContext", id());
-
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, applicationId, 
config, stateMgr, metrics);
 
+        log.info("standby-task [{}] Initializing state stores", id());
         initializeStateStores();
 
         ((StandbyContextImpl) this.processorContext).initialized();
@@ -86,12 +85,12 @@ public class StandbyTask extends AbstractTask {
      * @return a list of records not consumed
      */
     public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition 
partition, List<ConsumerRecord<byte[], byte[]>> records) {
-        log.debug("task [{}] Updates for partition [{}]", id(), partition);
+        log.debug("standby-task [{}] Updating standby replicas of its state 
store for partition [{}]", id(), partition);
         return stateMgr.updateStandbyStates(partition, records);
     }
 
     public void commit() {
-        log.debug("task [{}] Flushing", id());
+        log.debug("standby-task [{}] Committing its state", id());
         stateMgr.flush(processorContext);
 
         // reinitialize offset limits

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 061cfeb..b993054 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -48,14 +48,14 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
-    private final int maxBufferedSize;
-
+    private final String logPrefix;
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new 
PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
 
     private final Map<TopicPartition, Long> consumedOffsets;
     private final RecordCollector recordCollector;
+    private final int maxBufferedSize;
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
@@ -101,6 +101,8 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             partitionQueues.put(partition, queue);
         }
 
+        this.logPrefix = String.format("task [%s]", id);
+
         TimestampExtractor timestampExtractor = 
config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
         this.partitionGroup = new PartitionGroup(partitionQueues, 
timestampExtractor);
 
@@ -110,15 +112,15 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         // create the record recordCollector that maintains the produced 
offsets
         this.recordCollector = new RecordCollector(producer, id().toString());
 
-        log.info("task [{}] Creating restoration consumer client", id());
-
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, 
recordCollector, stateMgr, metrics, cache);
 
         // initialize the state stores
+        log.info("{} Initializing state stores", logPrefix);
         initializeStateStores();
 
         // initialize the task by initializing all its processor nodes in the 
topology
+        log.info("{} Initializing processor nodes of the topology", logPrefix);
         for (ProcessorNode node : this.topology.processors()) {
             this.currNode = node;
             try {
@@ -141,6 +143,8 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
     public void addRecords(TopicPartition partition, 
Iterable<ConsumerRecord<byte[], byte[]>> records) {
         int queueSize = partitionGroup.addRawRecords(partition, records);
 
+        log.trace("{} Added records into the buffered queue of partition {}, 
new queue size is {}", logPrefix, partition, queueSize);
+
         // if after adding these records, its partition queue's buffered size 
has been
         // increased beyond the threshold, we can then pause the consumption 
for this partition
         if (queueSize > this.maxBufferedSize) {
@@ -171,12 +175,12 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             this.currNode = recordInfo.node();
             TopicPartition partition = recordInfo.partition();
 
-            log.debug("task [{}] Start processing one record [{}]", id(), 
record);
+            log.trace("{} Start processing one record [{}]", logPrefix, 
record);
             final ProcessorRecordContext recordContext = 
createRecordContext(record);
             updateProcessorContext(recordContext, currNode);
             this.currNode.process(record.key(), record.value());
 
-            log.debug("task [{}] Completed processing one record [{}]", id(), 
record);
+            log.trace("{} Completed processing one record [{}]", logPrefix, 
record);
 
             // update the consumed offset map after processing is done
             consumedOffsets.put(partition, record.offset());
@@ -193,7 +197,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
                 requiresPoll = true;
             }
         } catch (KafkaException ke) {
-            throw new StreamsException(format("exception caught in process. 
taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
+            throw new StreamsException(format("Exception caught in process. 
taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
                                               id.toString(),
                                               currNode.name(),
                                               record.topic(),
@@ -238,15 +242,18 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
     @Override
     public void punctuate(ProcessorNode node, long timestamp) {
         if (currNode != null)
-            throw new IllegalStateException(String.format("task [%s] Current 
node is not null", id()));
+            throw new IllegalStateException(String.format("%s Current node is 
not null", logPrefix));
 
         currNode = node;
         final StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, 
timestamp);
         updateProcessorContext(createRecordContext(stampedRecord), node);
+
+        log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, 
node.name(), timestamp);
+
         try {
             node.processor().punctuate(timestamp);
         } catch (KafkaException ke) {
-            throw new StreamsException(String.format("exception caught in 
punctuate. taskId=%s processor=%s", id,  node.name()), ke);
+            throw new StreamsException(String.format("Exception caught in 
punctuate. taskId=%s processor=%s", id,  node.name()), ke);
         } finally {
             processorContext.setCurrentNode(null);
             currNode = null;
@@ -262,6 +269,8 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
      * Commit the current task state
      */
     public void commit() {
+        log.debug("{} Committing its state", logPrefix);
+
         // 1) flush local state
         stateMgr.flush(processorContext);
 
@@ -314,7 +323,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
      */
     public void schedule(long interval) {
         if (currNode == null)
-            throw new IllegalStateException(String.format("task [%s] Current 
node is null", id()));
+            throw new IllegalStateException(String.format("%s Current node is 
null", logPrefix));
 
         punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
     }
@@ -324,6 +333,8 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
      */
     @Override
     public void close() {
+        log.debug("{} Closing processor topology", logPrefix);
+
         this.partitionGroup.close();
         this.consumedOffsets.clear();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7663a30/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c3c6cc1..d7bb98c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -85,6 +85,7 @@ public class StreamThread extends Thread {
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
+    private final String logPrefix;
     private final String threadClientId;
     private final AtomicBoolean running;
     private final Map<TaskId, StreamTask> activeTasks;
@@ -117,6 +118,9 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
assignment) {
             try {
+                log.info("stream-thread [{}] New partitions [{}] assigned at 
the end of consumer rebalance.",
+                        StreamThread.this.getName(), assignment);
+
                 addStreamTasks(assignment);
                 addStandbyTasks();
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
@@ -131,6 +135,9 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) 
{
             try {
+                log.info("stream-thread [{}] partitions [{}] revoked at the 
beginning of consumer rebalance.",
+                        StreamThread.this.getName(), assignment);
+
                 initialized.set(false);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until 
partitions are assigned
                 shutdownTasksAndState(true);
@@ -180,17 +187,17 @@ public class StreamThread extends Thread {
         this.cacheSizeBytes = Math.max(0, 
config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
             config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, 
this.sensors);
-        // set the producer and consumer clients
 
 
-        log.info("stream-thread [{}] Creating producer client", threadName);
+        this.logPrefix = String.format("stream-thread [%s]", threadName);
+
+        // set the producer and consumer clients
+        log.info("{} Creating producer client", logPrefix);
         this.producer = 
clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
-        log.info("stream-thread [{}] Creating consumer client", threadName);
-        this.consumer = clientSupplier.getConsumer(
-                config.getConsumerConfigs(this, applicationId, 
threadClientId));
-        log.info("stream-thread [{}] Creating restore consumer client", 
threadName);
-        this.restoreConsumer = clientSupplier.getRestoreConsumer(
-                config.getRestoreConsumerConfigs(threadClientId));
+        log.info("{} Creating consumer client", logPrefix);
+        this.consumer = 
clientSupplier.getConsumer(config.getConsumerConfigs(this, applicationId, 
threadClientId));
+        log.info("{} Creating restore consumer client", logPrefix);
+        this.restoreConsumer = 
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
 
         // initialize the task list
         // activeTasks needs to be concurrent as it can be accessed
@@ -229,7 +236,7 @@ public class StreamThread extends Thread {
      */
     @Override
     public void run() {
-        log.info("stream-thread [{}] Starting", this.getName());
+        log.info("{} Starting", logPrefix);
 
         try {
             runLoop();
@@ -239,7 +246,7 @@ public class StreamThread extends Thread {
         } catch (Exception e) {
             // we have caught all Kafka related exceptions, and other runtime 
exceptions
             // should be due to user application errors
-            log.error(String.format("stream-thread [%s] Streams application 
error during processing: ", this.getName()),  e);
+            log.error("{} Streams application error during processing: ", 
logPrefix, e);
             throw e;
         } finally {
             shutdown();
@@ -258,32 +265,31 @@ public class StreamThread extends Thread {
     }
 
     private void shutdown() {
-        log.info("stream-thread [{}] Shutting down", this.getName());
+        log.info("{} Shutting down", logPrefix);
         shutdownTasksAndState(false);
 
-
         // close all embedded clients
         try {
             producer.close();
         } catch (Throwable e) {
-            log.error("stream-thread [{}] Failed to close producer: ", 
this.getName(), e);
+            log.error("{} Failed to close producer: ", logPrefix, e);
         }
         try {
             consumer.close();
         } catch (Throwable e) {
-            log.error("stream-thread [{}] Failed to close consumer: ", 
this.getName(), e);
+            log.error("{} Failed to close consumer: ", logPrefix, e);
         }
         try {
             restoreConsumer.close();
         } catch (Throwable e) {
-            log.error("stream-thread [{}] Failed to close restore consumer: ", 
this.getName(), e);
+            log.error("{} Failed to close restore consumer: ", logPrefix, e);
         }
 
         // remove all tasks
         removeStreamTasks();
         removeStandbyTasks();
 
-        log.info("stream-thread [{}] Stream thread shutdown complete", 
this.getName());
+        log.info("{} Stream thread shutdown complete", logPrefix);
     }
 
     private void shutdownTasksAndState(final boolean rethrowExceptions) {
@@ -301,7 +307,7 @@ public class StreamThread extends Thread {
             // un-assign the change log partitions
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
         } catch (Exception e) {
-            log.error(String.format("stream-thread [%s] Failed to un-assign 
change log partitions: ", this.getName()), e);
+            log.error("{} Failed to un-assign change log partitions: ", 
logPrefix, e);
             if (rethrowExceptions) {
                 throw e;
             }
@@ -321,12 +327,12 @@ public class StreamThread extends Thread {
             try {
                 action.apply(task);
             } catch (KafkaException e) {
-                log.error(String.format("stream-thread [%s] Failed to %s for 
%s %s: ",
-                                        StreamThread.this.getName(),
-                                        exceptionMessage,
-                                        task.getClass().getSimpleName(),
-                                        task.id()),
-                          e);
+                log.error("{} Failed while executing {} {} duet to {}: ",
+                        StreamThread.this.logPrefix,
+                        task.getClass().getSimpleName(),
+                        task.id(),
+                        exceptionMessage,
+                        e);
                 if (throwExceptions) {
                     throw e;
                 }
@@ -338,7 +344,8 @@ public class StreamThread extends Thread {
         performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
-                    task.closeStateManager();
+                log.info("{} Closing the state manager of task {}", 
StreamThread.this.logPrefix, task.id());
+                task.closeStateManager();
             }
         }, "close state manager", throwExceptions);
     }
@@ -348,6 +355,7 @@ public class StreamThread extends Thread {
         performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
+                log.info("{} Committing consumer offsets of task {}", 
StreamThread.this.logPrefix, task.id());
                 task.commitOffsets();
             }
         }, "commit consumer offsets", throwExceptions);
@@ -357,6 +365,7 @@ public class StreamThread extends Thread {
         performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
+                log.info("{} Flushing state stores of task {}", 
StreamThread.this.logPrefix, task.id());
                 task.flushState();
             }
         }, "flush state", throwExceptions);
@@ -398,7 +407,7 @@ public class StreamThread extends Thread {
                 ConsumerRecords<byte[], byte[]> records = 
consumer.poll(longPoll ? this.pollTimeMs : 0);
 
                 if (rebalanceException != null)
-                    throw new StreamsException(String.format("stream-thread 
[%s] Failed to rebalance", this.getName()), rebalanceException);
+                    throw new StreamsException(logPrefix + " Failed to 
rebalance", rebalanceException);
 
                 if (!records.isEmpty()) {
                     for (TopicPartition partition : records.partitions()) {
@@ -482,7 +491,7 @@ public class StreamThread extends Thread {
                     StandbyTask task = standbyTasksByPartition.get(partition);
 
                     if (task == null) {
-                        throw new 
StreamsException(String.format("stream-thread [%s] missing standby task for 
partition %s", this.getName(), partition));
+                        throw new StreamsException(logPrefix + " Missing 
standby task for partition " + partition);
                     }
 
                     List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
@@ -497,7 +506,7 @@ public class StreamThread extends Thread {
 
     private boolean stillRunning() {
         if (!running.get()) {
-            log.debug("stream-thread [{}] Shutting down at user request", 
this.getName());
+            log.debug("{} Shutting down at user request", logPrefix);
             return false;
         }
 
@@ -512,7 +521,7 @@ public class StreamThread extends Thread {
                 sensors.punctuateTimeSensor.record(computeLatency());
 
         } catch (KafkaException e) {
-            log.error(String.format("stream-thread [%s] Failed to punctuate 
active task %s: ", this.getName(), task.id()), e);
+            log.error("{} Failed to punctuate active task {}: ", logPrefix, 
task.id(), e);
             throw e;
         }
     }
@@ -524,7 +533,7 @@ public class StreamThread extends Thread {
         long now = time.milliseconds();
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
-            log.trace("stream-thread [{}] Committing processor instances 
because the commit interval has elapsed", this.getName());
+            log.info("{} Committing all tasks because the commit interval {}ms 
has elapsed", logPrefix, commitTimeMs);
 
             commitAll();
             lastCommitMs = now;
@@ -561,14 +570,16 @@ public class StreamThread extends Thread {
      * Commit the state of a task
      */
     private void commitOne(AbstractTask task) {
+        log.info("{} Committing task {}", logPrefix, task.id());
+
         try {
             task.commit();
         } catch (CommitFailedException e) {
             // commit failed. Just log it.
-            log.warn(String.format("stream-thread [%s] Failed to commit %s %s: 
", this.getName(), task.getClass().getSimpleName(), task.id()), e);
+            log.warn("{} Failed to commit {} {} state: ", logPrefix, 
task.getClass().getSimpleName(), task.id(), e);
         } catch (KafkaException e) {
             // commit failed due to an unexpected exception. Log it and 
rethrow the exception.
-            log.error(String.format("stream-thread [%s] Failed to commit %s 
%s: ", this.getName(), task.getClass().getSimpleName(), task.id()), e);
+            log.error("{} Failed to commit {} {} state: ", logPrefix, 
task.getClass().getSimpleName(), task.id(), e);
             throw e;
         }
 
@@ -612,9 +623,9 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
-
-
     protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitions) {
+        log.info("{} Creating active task {} with assigned partitions [{}]", 
logPrefix, id, partitions);
+
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
@@ -624,7 +635,7 @@ public class StreamThread extends Thread {
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
         if (partitionAssignor == null)
-            throw new IllegalStateException(String.format("stream-thread [%s] 
Partition assignor has not been initialized while adding stream tasks: this 
should not happen.", this.getName()));
+            throw new IllegalStateException(logPrefix + " Partition assignor 
has not been initialized while adding stream tasks: this should not happen.");
 
         HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new 
HashMap<>();
 
@@ -652,26 +663,15 @@ public class StreamThread extends Thread {
                 for (TopicPartition partition : partitions)
                     activeTasksByPartition.put(partition, task);
             } catch (StreamsException e) {
-                log.error(String.format("stream-thread [%s] Failed to create 
an active task %s: ", this.getName(), taskId), e);
+                log.error("{} Failed to create an active task %s: ", 
logPrefix, taskId, e);
                 throw e;
             }
         }
     }
 
-    private void removeStreamTasks() {
-        try {
-            prevTasks.clear();
-            prevTasks.addAll(activeTasks.keySet());
-
-            activeTasks.clear();
-            activeTasksByPartition.clear();
+    private StandbyTask createStandbyTask(TaskId id, 
Collection<TopicPartition> partitions) {
+        log.info("{} Creating new standby task {} with assigned partitions 
[{}]", logPrefix, id, partitions);
 
-        } catch (Exception e) {
-            log.error(String.format("stream-thread [%s] Failed to remove 
stream tasks: ", this.getName()), e);
-        }
-    }
-
-    protected StandbyTask createStandbyTask(TaskId id, 
Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
@@ -685,7 +685,7 @@ public class StreamThread extends Thread {
 
     private void addStandbyTasks() {
         if (partitionAssignor == null)
-            throw new IllegalStateException(String.format("stream-thread [%s] 
Partition assignor has not been initialized while adding standby tasks: this 
should not happen.", this.getName()));
+            throw new IllegalStateException(logPrefix + " Partition assignor 
has not been initialized while adding standby tasks: this should not happen.");
 
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
@@ -721,6 +721,39 @@ public class StreamThread extends Thread {
         }
     }
 
+    private void removeStreamTasks() {
+        log.info("{} Removing all active tasks [{}]", logPrefix, 
activeTasks.keySet());
+
+        try {
+            prevTasks.clear();
+            prevTasks.addAll(activeTasks.keySet());
+
+            activeTasks.clear();
+            activeTasksByPartition.clear();
+
+        } catch (Exception e) {
+            log.error("{} Failed to remove stream tasks: ", logPrefix, e);
+        }
+    }
+
+    private void removeStandbyTasks() {
+        log.info("{} Removing all standby tasks [{}]", logPrefix, 
standbyTasks.keySet());
+
+        standbyTasks.clear();
+        standbyTasksByPartition.clear();
+        standbyRecords.clear();
+    }
+
+    private void closeAllTasks() {
+        performOnAllTasks(new AbstractTaskAction() {
+            @Override
+            public void apply(final AbstractTask task) {
+                log.info("{} Closing a task {}", StreamThread.this.logPrefix, 
task.id());
+                task.close();
+                sensors.taskDestructionSensor.record();
+            }
+        }, "close", false);
+    }
 
     /**
      * Produces a string representation contain useful information about a 
StreamThread.
@@ -755,23 +788,6 @@ public class StreamThread extends Thread {
         return sb.toString();
     }
 
-    private void closeAllTasks() {
-        performOnAllTasks(new AbstractTaskAction() {
-            @Override
-            public void apply(final AbstractTask task) {
-                log.info("stream-thread [{}] Removing a task {}", 
StreamThread.this.getName(), task.id());
-                task.close();
-                sensors.taskDestructionSensor.record();
-            }
-        }, "close", false);
-    }
-
-    private void removeStandbyTasks() {
-        standbyTasks.clear();
-        standbyTasksByPartition.clear();
-        standbyRecords.clear();
-    }
-
     private class StreamsMetricsImpl implements StreamsMetrics, 
ThreadCacheMetrics {
         final Metrics metrics;
         final String metricGrpName;

Reply via email to