Repository: incubator-samza Updated Branches: refs/heads/0.7.0 0cf3e3c2d -> 4683fa755
SAMZA-247; move test performance task into samza-test main jar Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4683fa75 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4683fa75 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4683fa75 Branch: refs/heads/0.7.0 Commit: 4683fa755a9858e5f5d695aa1fff2aaa30be65c9 Parents: 0cf3e3c Author: Chris Riccomini <[email protected]> Authored: Tue Apr 22 13:17:08 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Apr 22 13:17:39 2014 -0700 ---------------------------------------------------------------------- .../test/performance/TestPerformanceTask.scala | 62 ++++++++++++++++++++ .../TestSamzaContainerPerformance.scala | 52 ---------------- 2 files changed, 62 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4683fa75/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala new file mode 100644 index 0000000..12c5259 --- /dev/null +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala @@ -0,0 +1,62 @@ +package org.apache.samza.test.performance + +import org.apache.samza.task.TaskContext +import org.apache.samza.task.InitableTask +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskCoordinator +import org.apache.samza.config.Config +import grizzled.slf4j.Logging + +object TestPerformanceTask { + var messagesProcessed = 0 + var startTime = 0L +} + +/** + * A little test task that prints how many messages a SamzaContainer has + * received, and over what period of time. The messages-processed count is + * stored statically, so that all tasks in a single SamzaContainer increment + * the same counter. + * + * The log interval is configured with task.log.interval, which defines how many + * messages to process before printing a log line. The task will continue running + * until task.max.messages have been processed, at which point it will shut + * itself down. + */ +class TestPerformanceTask extends StreamTask with InitableTask with Logging { + import TestPerformanceTask._ + + /** + * How many messages to process before a log message is printed. + */ + var logInterval = 10000 + + /** + * How many messages to process before shutting down. + */ + var maxMessages = 100000 + + def init(config: Config, context: TaskContext) { + logInterval = config.getInt("task.log.interval", 10000) + maxMessages = config.getInt("task.max.messages", 100000) + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + if (startTime == 0) { + startTime = System.currentTimeMillis + } + + messagesProcessed += 1 + + if (messagesProcessed % logInterval == 0) { + val seconds = (System.currentTimeMillis - startTime) / 1000 + info("Processed %s messages in %s seconds." format (messagesProcessed, seconds)) + } + + if (messagesProcessed >= maxMessages) { + coordinator.shutdown + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4683fa75/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala index 3dc2630..4016768 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala @@ -102,55 +102,3 @@ class TestSamzaContainerPerformance extends Logging{ job.waitForFinish(Int.MaxValue) } } - -object TestPerformanceTask { - var messagesProcessed = 0 - var startTime = 0L -} - -/** - * A little test task that prints how many messages a SamzaContainer has - * received, and over what period of time. The messages-processed count is - * stored statically, so that all tasks in a single SamzaContainer increment - * the same counter. - * - * The log interval is configured with task.log.interval, which defines how many - * messages to process before printing a log line. The task will continue running - * until task.max.messages have been processed, at which point it will shut - * itself down. - */ -class TestPerformanceTask extends StreamTask with InitableTask with Logging { - import TestPerformanceTask._ - - /** - * How many messages to process before a log message is printed. - */ - var logInterval = 10000 - - /** - * How many messages to process before shutting down. - */ - var maxMessages = 100000 - - def init(config: Config, context: TaskContext) { - logInterval = config.getInt("task.log.interval", 10000) - maxMessages = config.getInt("task.max.messages", 100000) - } - - def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { - if (startTime == 0) { - startTime = System.currentTimeMillis - } - - messagesProcessed += 1 - - if (messagesProcessed % logInterval == 0) { - val seconds = (System.currentTimeMillis - startTime) / 1000 - info("Processed %s messages in %s seconds." format (messagesProcessed, seconds)) - } - - if (messagesProcessed >= maxMessages) { - coordinator.shutdown - } - } -} \ No newline at end of file
