Repository: incubator-samza
Updated Branches:
  refs/heads/master 8dd3a140f -> fa03273c9


SAMZA-379; turn down verbosity of debug logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/fa03273c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/fa03273c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/fa03273c

Branch: refs/heads/master
Commit: fa03273c93c84892afe4522ff53015b0e7a5760e
Parents: 8dd3a14
Author: Chris Riccomini <cricc...@criccomi-mn.linkedin.biz>
Authored: Thu Aug 14 10:46:29 2014 -0700
Committer: Chris Riccomini <cricc...@criccomi-mn.linkedin.biz>
Committed: Thu Aug 14 10:46:29 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/container/RunLoop.scala  |  3 +--
 .../org/apache/samza/system/SystemConsumers.scala   | 16 ++++++++--------
 .../samza/system/kafka/KafkaSystemProducer.scala    |  6 +++---
 3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 6851731..3a264ad 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -78,7 +78,6 @@ class RunLoop(
     metrics.processes.inc
 
     updateTimer(metrics.processMs) {
-
       val envelope = updateTimer(metrics.chooseMs) {
         consumerMultiplexer.choose
       }
@@ -90,8 +89,8 @@ class RunLoop(
         metrics.envelopes.inc
 
         val taskInstance = systemStreamPartitionToTaskInstance(ssp)
-
         val coordinator = new ReadableCoordinator(taskInstance.taskName)
+
         taskInstance.process(envelope, coordinator)
         checkCoordinator(coordinator)
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index fef7227..deb966b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -188,7 +188,7 @@ class SystemConsumers(
     val envelopeFromChooser = chooser.choose
 
     if (envelopeFromChooser == null) {
-      debug("Chooser returned null.")
+      trace("Chooser returned null.")
 
       metrics.choseNull.inc
 
@@ -197,7 +197,7 @@ class SystemConsumers(
     } else {
       val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
 
-      debug("Chooser returned an incoming message envelope: %s" format 
envelopeFromChooser)
+      trace("Chooser returned an incoming message envelope: %s" format 
envelopeFromChooser)
 
       // Ok to give the chooser a new message from this stream.
       timeout = 0
@@ -221,11 +221,11 @@ class SystemConsumers(
    * messages to process.
    */
   private def poll(systemName: String) {
-    debug("Polling system consumer: %s" format systemName)
+    trace("Polling system consumer: %s" format systemName)
 
     metrics.systemPolls(systemName).inc
 
-    debug("Getting fetch map for system: %s" format systemName)
+    trace("Getting fetch map for system: %s" format systemName)
 
     val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName)
 
@@ -233,13 +233,13 @@ class SystemConsumers(
     if (systemFetchSet.size > 0) {
       val consumer = consumers(systemName)
 
-      debug("Fetching: %s" format systemFetchSet)
+      trace("Fetching: %s" format systemFetchSet)
 
       
metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size)
 
       val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, 
timeout)
 
-      debug("Got incoming message envelopes: %s" format 
systemStreamPartitionEnvelopes)
+      trace("Got incoming message envelopes: %s" format 
systemStreamPartitionEnvelopes)
 
       metrics.systemMessagesPerPoll(systemName).inc
 
@@ -262,12 +262,12 @@ class SystemConsumers(
         }
       }
     } else {
-      debug("Skipping polling for %s. Already have messages available for all 
registered SystemStreamPartitions." format (systemName))
+      trace("Skipping polling for %s. Already have messages available for all 
registered SystemStreamPartitions." format (systemName))
     }
   }
 
   private def refresh {
-    debug("Refreshing chooser with new messages.")
+    trace("Refreshing chooser with new messages.")
 
     // Update last poll time so we don't poll too frequently.
     lastPollMs = clock()

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index c5fe462..22c8f0c 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -53,7 +53,7 @@ class KafkaSystemProducer(
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
-    debug("Enqueueing message: %s, %s." format (source, envelope))
+    trace("Enqueueing message: %s, %s." format (source, envelope))
 
     metrics.sends.inc
 
@@ -70,7 +70,7 @@ class KafkaSystemProducer(
 
   def flush(source: String) {
     val buffer = sourceBuffers(source)
-    debug("Flushing buffer with size: %s." format buffer.size)
+    trace("Flushing buffer with size: %s." format buffer.size)
     metrics.flushes.inc
 
     retryBackoff.run(
@@ -99,6 +99,6 @@ class KafkaSystemProducer(
     )
 
     buffer.clear
-    debug("Flushed buffer.")
+    trace("Flushed buffer.")
   }
 }

Reply via email to