Repository: incubator-samza Updated Branches: refs/heads/master 8dd3a140f -> fa03273c9
SAMZA-379; turn down verbosity of debug logging Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/fa03273c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/fa03273c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/fa03273c Branch: refs/heads/master Commit: fa03273c93c84892afe4522ff53015b0e7a5760e Parents: 8dd3a14 Author: Chris Riccomini <cricc...@criccomi-mn.linkedin.biz> Authored: Thu Aug 14 10:46:29 2014 -0700 Committer: Chris Riccomini <cricc...@criccomi-mn.linkedin.biz> Committed: Thu Aug 14 10:46:29 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/samza/container/RunLoop.scala | 3 +-- .../org/apache/samza/system/SystemConsumers.scala | 16 ++++++++-------- .../samza/system/kafka/KafkaSystemProducer.scala | 6 +++--- 3 files changed, 12 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 6851731..3a264ad 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -78,7 +78,6 @@ class RunLoop( metrics.processes.inc updateTimer(metrics.processMs) { - val envelope = updateTimer(metrics.chooseMs) { consumerMultiplexer.choose } @@ -90,8 +89,8 @@ class RunLoop( metrics.envelopes.inc val taskInstance = systemStreamPartitionToTaskInstance(ssp) - val coordinator = new ReadableCoordinator(taskInstance.taskName) + taskInstance.process(envelope, coordinator) checkCoordinator(coordinator) } else { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index fef7227..deb966b 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -188,7 +188,7 @@ class SystemConsumers( val envelopeFromChooser = chooser.choose if (envelopeFromChooser == null) { - debug("Chooser returned null.") + trace("Chooser returned null.") metrics.choseNull.inc @@ -197,7 +197,7 @@ class SystemConsumers( } else { val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition - debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) + trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) // Ok to give the chooser a new message from this stream. timeout = 0 @@ -221,11 +221,11 @@ class SystemConsumers( * messages to process. */ private def poll(systemName: String) { - debug("Polling system consumer: %s" format systemName) + trace("Polling system consumer: %s" format systemName) metrics.systemPolls(systemName).inc - debug("Getting fetch map for system: %s" format systemName) + trace("Getting fetch map for system: %s" format systemName) val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName) @@ -233,13 +233,13 @@ class SystemConsumers( if (systemFetchSet.size > 0) { val consumer = consumers(systemName) - debug("Fetching: %s" format systemFetchSet) + trace("Fetching: %s" format systemFetchSet) metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size) val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, timeout) - debug("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes) + trace("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes) metrics.systemMessagesPerPoll(systemName).inc @@ -262,12 +262,12 @@ class SystemConsumers( } } } else { - debug("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName)) + trace("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName)) } } private def refresh { - debug("Refreshing chooser with new messages.") + trace("Refreshing chooser with new messages.") // Update last poll time so we don't poll too frequently. lastPollMs = clock() http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/fa03273c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index c5fe462..22c8f0c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -53,7 +53,7 @@ class KafkaSystemProducer( } def send(source: String, envelope: OutgoingMessageEnvelope) { - debug("Enqueueing message: %s, %s." format (source, envelope)) + trace("Enqueueing message: %s, %s." format (source, envelope)) metrics.sends.inc @@ -70,7 +70,7 @@ class KafkaSystemProducer( def flush(source: String) { val buffer = sourceBuffers(source) - debug("Flushing buffer with size: %s." format buffer.size) + trace("Flushing buffer with size: %s." format buffer.size) metrics.flushes.inc retryBackoff.run( @@ -99,6 +99,6 @@ class KafkaSystemProducer( ) buffer.clear - debug("Flushed buffer.") + trace("Flushed buffer.") } }