Updated Branches: refs/heads/master bda8df6e2 -> 9b360405c
SAMZA-111; performance fix in SystemConsumers to speed up consumers that have a high number of partitions (e.g. 100s or more). Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9b360405 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9b360405 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9b360405 Branch: refs/heads/master Commit: 9b360405c598a95e08c41f75a29742b9f20bbcee Parents: bda8df6 Author: Chris Riccomini <[email protected]> Authored: Thu Jan 2 15:15:19 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Jan 2 15:15:19 2014 -0800 ---------------------------------------------------------------------- .reviewboardrc | 2 +- build.gradle | 13 +- .../apache/samza/system/SystemConsumers.scala | 131 +++++++++++++- samza-test/java.hprof.txt | 65 ------- .../samza/system/mock/MockSystemAdmin.java | 56 ++++++ .../samza/system/mock/MockSystemConsumer.java | 172 +++++++++++++++++++ .../samza/system/mock/MockSystemFactory.java | 99 +++++++++++ .../test/integration/TestStatefulTask.scala | 1 - .../TestSamzaContainerPerformance.scala | 156 +++++++++++++++++ 9 files changed, 620 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/.reviewboardrc ---------------------------------------------------------------------- diff --git a/.reviewboardrc b/.reviewboardrc index 0ee6a71..0a0a83d 100644 --- a/.reviewboardrc +++ b/.reviewboardrc @@ -1 +1 @@ -REPOSITORY = 'git://git.apache.org/incubator-samza.git' \ No newline at end of file +REPOSITORY = 'git://git.apache.org/incubator-samza.git' http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 49a8459..04b0de7 100644 --- a/build.gradle +++ b/build.gradle @@ -179,6 +179,7 @@ project(":samza-test_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-kv_$scalaVersion") + compile project(":samza-core_$scalaVersion") compile "org.scala-lang:scala-library:$scalaVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" @@ -187,12 +188,18 @@ project(":samza-test_$scalaVersion") { compile files("../samza-kafka/lib/kafka_$scalaVersion-" + kafkaVersion + ".jar") testCompile files("../samza-kafka/lib/kafka_$scalaVersion-" + kafkaVersion + "-test.jar") testCompile "com.101tec:zkclient:$zkClientVersion" - testCompile project(":samza-core_$scalaVersion") testCompile project(":samza-kafka_$scalaVersion") + testRuntime "org.slf4j:slf4j-simple:1.6.2" } test { - // Bump up the heap so we can start ZooKeeper and Kafka brokers. - maxHeapSize = "1024m" + // Bump up the heap so we can start ZooKeeper and Kafka brokers. Also + // required for TestSamzaContainerPerformance when a high thread count + // with a lot of inputs is used. + maxHeapSize = "4096m" + + // Forward all samza.* system properties to test subprocesses. This is + // useful for configuring TestSamzaContainerPerformance from the CLI. + systemProperties = System.properties.findAll { it.key.startsWith("samza") } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 5cbffe5..dd7d357 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -25,19 +25,116 @@ import org.apache.samza.serializers.SerdeManager import grizzled.slf4j.Logging import org.apache.samza.system.chooser.MessageChooser +/** + * The SystemConsumers class coordinates between all SystemConsumers, the + * MessageChooser, and the SamzaContainer. Its job is to poll each + * SystemConsumer for messages, update the + * {@link org.apache.samza.system.chooser.MessageChooser} with new incoming + * messages, poll the MessageChooser for the next message to process, and + * return that message to the SamzaContainer. + */ class SystemConsumers( + + /** + * The class that determines the order to process incoming messages. + */ chooser: MessageChooser, + + /** + * A map of SystemConsumers that should be polled for new messages. + */ consumers: Map[String, SystemConsumer], + + /** + * The class that handles deserialization of incoming messages. + */ serdeManager: SerdeManager = new SerdeManager, + + /** + * A helper class to hold all of SystemConsumers' metrics. + */ metrics: SystemConsumersMetrics = new SystemConsumersMetrics, + + /** + * The maximum number of messages to poll from a single SystemStreamPartition. + */ maxMsgsPerStreamPartition: Int = 1000, + + /** + * A percentage threshold that determines when a SystemStreamPartition + * should be polled again. 0.0 means poll for more messages only when + * SystemConsumer's buffer is totally empty. 0.2 means poll for more messages + * when SystemConsumers' buffer is 80% empty. SystemConsumers' buffer size + * is determined by maxMsgsPerStreamPartition. + */ + fetchThresholdPct: Float = 0f, + + /** + * If MessageChooser returns null when it's polled, SystemConsumers will + * poll each SystemConsumer with a timeout next time it tries to poll for + * messages. Setting the timeout to 0 means that SamzaContainer's main + * thread will sit in a tight loop polling every SystemConsumer over and + * over again if no new messages are available. + */ noNewMessagesTimeout: Long = 10) extends Logging { + /** + * A buffer of incoming messages grouped by SystemStreamPartition. + */ var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() + + /** + * The MessageChooser only gets updated with one message-per-SystemStreamPartition + * at a time. The MessageChooser will not receive a second message from the + * same SystemStreamPartition until the first message that it received has + * been returned to SystemConsumers. This set keeps track of which + * SystemStreamPartitions are valid to give to the MessageChooser. + */ var neededByChooser = Set[SystemStreamPartition]() + + /** + * A map of every SystemStreamPartition that SystemConsumers is responsible + * for polling. The values are how many messages to poll for during the next + * SystemConsumers.poll call. + * + * If the value for a SystemStreamPartition is maxMsgsPerStreamPartition, + * then the implication is that SystemConsumers has no incoming messages in + * its buffer for the SystemStreamPartition. If the value is 0 then the + * SystemConsumers' buffer is full for the SystemStreamPartition. + */ var fetchMap = Map[SystemStreamPartition, java.lang.Integer]() + + /** + * A cache of fetchMap values, grouped according to the system. This is + * purely a trick to get better performance out of the SystemConsumsers + * class, since the map from systemName to its fetchMap is used for every + * poll call. + */ + var systemFetchMapCache = Map[String, Map[SystemStreamPartition, java.lang.Integer]]() + + /** + * Default timeout to noNewMessagesTimeout. Every time SystemConsumers + * receives incoming messages, it sets timout to 0. Every time + * SystemConsumers receives no new incoming messages from the MessageChooser, + * it sets timeout to noNewMessagesTimeout again. + */ var timeout = noNewMessagesTimeout + /** + * Used to determine when the next poll should take place for a given + * SystemStreamPartition. SystemConsumers inspects the value of fetchMap for each + * SystemStreamPartition, and decides to poll for the SystemStreamPartition + * if the fetchMap value is greater than or equal to the + * depletedQueueSizeThreshold. For example, suppose the fetchThresholdPct is + * 0.2, and the maxMsgsPerStreamPartition is 1000. This would result in + * depletedQueueSizeThreshold being 800. This a SystemStreamPartition with a + * fetchMap value of 936 (164 messages in the buffer is less than 20% of + * 1000) would be polled for more messages, while a SystemStream partition + * with a fetchMap value of 548 would not be polled for more messages (452 + * messages in the buffer is greater than 20% of 1000). + */ + val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - fetchThresholdPct)).toInt + debug("Got stream consumers: %s" format consumers) debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition) debug("Got no new message timeout: %s" format noNewMessagesTimeout) @@ -72,7 +169,7 @@ class SystemConsumers( debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset)) neededByChooser += systemStreamPartition - fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition + updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition) unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]() consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset) chooser.register(systemStreamPartition, lastReadOffset) @@ -115,11 +212,15 @@ class SystemConsumers( // If we have messages for a stream that the chooser needs, then update. if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) { chooser.update(unprocessedMessages(systemStreamPartition).dequeue) - fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue + 1) + updateFetchMap(systemStreamPartition) neededByChooser -= systemStreamPartition }) } + /** + * Poll a system for new messages from SystemStreamPartitions that have + * dipped below the depletedQueueSizeThreshold threshold. + */ private def poll(systemName: String) = { debug("Polling system consumer: %s" format systemName) @@ -127,9 +228,9 @@ class SystemConsumers( val consumer = consumers(systemName) - debug("Filtering for system: %s, %s" format (systemName, fetchMap)) + debug("Getting fetch map for system: %s" format systemName) - val systemFetchMap = fetchMap.filterKeys(_.getSystem.equals(systemName)) + val systemFetchMap = systemFetchMapCache(systemName) debug("Fetching: %s" format systemFetchMap) @@ -147,7 +248,7 @@ class SystemConsumers( debug("Got message for: %s, %s" format (systemStreamPartition, envelope)) - fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue - 1) + updateFetchMap(systemStreamPartition, -1) debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap)) @@ -156,4 +257,24 @@ class SystemConsumers( debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages)) }) } + + /** + * A helper method that updates both fetchMap and systemFetchMapCache + * simultaneously. This is a convenience method to make sure that the + * systemFetchMapCache stays in sync with fetchMap. + */ + private def updateFetchMap(systemStreamPartition: SystemStreamPartition, amount: Int = 1) { + val fetchSize = fetchMap.getOrElse(systemStreamPartition, new Integer(0)).intValue + amount + val systemName = systemStreamPartition.getSystem + var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map()) + + if (fetchSize >= depletedQueueSizeThreshold) { + systemFetchMap += systemStreamPartition -> fetchSize + } else { + systemFetchMap -= systemStreamPartition + } + + fetchMap += systemStreamPartition -> fetchSize + systemFetchMapCache += systemName -> systemFetchMap + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/java.hprof.txt ---------------------------------------------------------------------- diff --git a/samza-test/java.hprof.txt b/samza-test/java.hprof.txt deleted file mode 100644 index c5693d2..0000000 --- a/samza-test/java.hprof.txt +++ /dev/null @@ -1,65 +0,0 @@ -JAVA PROFILE 1.0.1, created Sat Aug 3 14:27:50 2013 - -Header for -agentlib:hprof (or -Xrunhprof) ASCII Output (JDK 5.0 JVMTI based) - -%W% %E% - - Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved. - -WARNING! This file format is under development, and is subject to -change without notice. - -This file contains the following types of records: - -THREAD START -THREAD END mark the lifetime of Java threads - -TRACE represents a Java stack trace. Each trace consists - of a series of stack frames. Other records refer to - TRACEs to identify (1) where object allocations have - taken place, (2) the frames in which GC roots were - found, and (3) frequently executed methods. - -HEAP DUMP is a complete snapshot of all live objects in the Java - heap. Following distinctions are made: - - ROOT root set as determined by GC - CLS classes - OBJ instances - ARR arrays - -SITES is a sorted list of allocation sites. This identifies - the most heavily allocated object types, and the TRACE - at which those allocations occurred. - -CPU SAMPLES is a statistical profile of program execution. The VM - periodically samples all running threads, and assigns - a quantum to active TRACEs in those threads. Entries - in this record are TRACEs ranked by the percentage of - total quanta they consumed; top-ranked TRACEs are - typically hot spots in the program. - -CPU TIME is a profile of program execution obtained by measuring - the time spent in individual methods (excluding the time - spent in callees), as well as by counting the number of - times each method is called. Entries in this record are - TRACEs ranked by the percentage of total CPU time. The - "count" field indicates the number of times each TRACE - is invoked. - -MONITOR TIME is a profile of monitor contention obtained by measuring - the time spent by a thread waiting to enter a monitor. - Entries in this record are TRACEs ranked by the percentage - of total monitor contention time and a brief description - of the monitor. The "count" field indicates the number of - times the monitor was contended at that TRACE. - -MONITOR DUMP is a complete snapshot of all the monitors and threads in - the System. - -HEAP DUMP, SITES, CPU SAMPLES|TIME and MONITOR DUMP|TIME records are generated -at program exit. They can also be obtained during program execution by typing -Ctrl-\ (on Solaris) or by typing Ctrl-Break (on Win32). - --------- - http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java new file mode 100644 index 0000000..13ac689 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.mock; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStreamPartition; + +/** + * A SystemAdmin that returns a constant set of partitions for all streams. + */ +public class MockSystemAdmin implements SystemAdmin { + private final Set<Partition> partitions; + + public MockSystemAdmin(int partitionCount) { + this.partitions = new HashSet<Partition>(); + + for (int i = 0; i < partitionCount; ++i) { + partitions.add(new Partition(i)); + } + } + + @Override + public Set<Partition> getPartitions(String streamName) { + // Partitions are immutable, so making the set immutable should make the + // partition set fully safe to re-use. + return Collections.unmodifiableSet(partitions); + } + + @Override + public Map<SystemStreamPartition, String> getLastOffsets(Set<String> streams) { + throw new RuntimeException("MockSystemAdmin doesn't implement this method."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java new file mode 100644 index 0000000..c0791d7 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.mock; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.Clock; + +/** + * MockSystemConsumer is a class that simulates a multi-threaded consumer that + * uses BlockingEnvelopeMap. The primary use for this class is to do performance + * testing. + * + * This class works by starting up (threadCount) threads. Each thread adds + * (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for + * (brokerSleepMs). The sleep is important to simulate network latency when + * executing a fetch against a remote streaming system (i.e. Kafka). + */ +public class MockSystemConsumer extends BlockingEnvelopeMap { + private final int messagesPerBatch; + private final int threadCount; + private final int brokerSleepMs; + + /** + * The SystemStreamPartitions that this consumer is in charge of. + */ + private final Set<SystemStreamPartition> ssps; + + /** + * The consumer threads that are putting IncomingMessageEnvelopes into + * BlockingEnvelopeMap. + */ + private List<Thread> threads; + + /** + * + * @param messagesPerBatch + * The number of messages to add to the BlockingEnvelopeMap before + * sleeping. + * @param threadCount + * How many threads to run. + * @param brokerSleepMs + * How long each thread should sleep between batch writes. + */ + public MockSystemConsumer(int messagesPerBatch, int threadCount, int brokerSleepMs) { + super(new MetricsRegistryMap("test-container-performance"), new Clock() { + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + }); + + this.messagesPerBatch = messagesPerBatch; + this.threadCount = threadCount; + this.brokerSleepMs = brokerSleepMs; + this.ssps = new HashSet<SystemStreamPartition>(); + this.threads = new ArrayList<Thread>(threadCount); + } + + /** + * Assign SystemStreamPartitions to all of the threads, and start them up to + * begin simulating consuming messages. + */ + @Override + public void start() { + for (int i = 0; i < threadCount; ++i) { + Set<SystemStreamPartition> threadSsps = new HashSet<SystemStreamPartition>(); + + // Assign SystemStreamPartitions for this thread. + for (SystemStreamPartition ssp : ssps) { + if (Math.abs(ssp.hashCode()) % threadCount == i) { + threadSsps.add(ssp); + } + } + + // Start thread. + Thread thread = new Thread(new MockSystemConsumerRunnable(threadSsps)); + thread.setDaemon(true); + threads.add(thread); + thread.start(); + } + } + + /** + * Kill all the threads, and shutdown. + */ + @Override + public void stop() { + for (Thread thread : threads) { + thread.interrupt(); + } + + try { + for (Thread thread : threads) { + thread.join(); + } + } catch (InterruptedException e) { + } + } + + @Override + public void register(SystemStreamPartition systemStreamPartition, String lastReadOffset) { + super.register(systemStreamPartition, lastReadOffset); + ssps.add(systemStreamPartition); + setIsAtHead(systemStreamPartition, true); + } + + /** + * The worker thread for MockSystemConsumer that simulates reading messages + * from a remote streaming system (i.e. Kafka), and writing them to the + * BlockingEnvelopeMap. + */ + public class MockSystemConsumerRunnable implements Runnable { + private final Set<SystemStreamPartition> ssps; + + public MockSystemConsumerRunnable(Set<SystemStreamPartition> ssps) { + this.ssps = ssps; + } + + @Override + public void run() { + try { + while (!Thread.interrupted() && ssps.size() > 0) { + Set<SystemStreamPartition> sspsToFetch = new HashSet<SystemStreamPartition>(); + + // Only fetch messages when there are no outstanding messages left. + for (SystemStreamPartition ssp : ssps) { + if (getNumMessagesInQueue(ssp) <= 0) { + sspsToFetch.add(ssp); + } + } + + // Simulate a broker fetch request's network latency. + Thread.sleep(brokerSleepMs); + + // Add messages to the BlockingEnvelopeMap. + for (SystemStreamPartition ssp : sspsToFetch) { + for (int i = 0; i < messagesPerBatch; ++i) { + add(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value")); + } + } + } + } catch (InterruptedException e) { + System.out.println("Got interrupt. Shutting down."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java new file mode 100644 index 0000000..02d6d37 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.mock; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; + +/** + * MockSystemFactory was built to make performance testing easier. + */ +public class MockSystemFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + MockSystemConsumerConfig consumerConfig = new MockSystemConsumerConfig(systemName, config); + + return new MockSystemConsumer(consumerConfig.getMessagesPerBatch(), consumerConfig.getConsumerThreadCount(), consumerConfig.getBrokerSleepMs()); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new RuntimeException("MockSystemProducer not implemented."); + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + MockSystemConsumerConfig consumerConfig = new MockSystemConsumerConfig(systemName, config); + + return new MockSystemAdmin(consumerConfig.getPartitionsPerStream()); + } + + /** + * A helper class that's useful for yanking out MockSystem's configuration + * out. + */ + public static class MockSystemConsumerConfig { + public static final int DEFAULT_PARTITION_COUNT = 4; + public static final int DEFAULT_MESSAGES_PER_BATCH = 5000; + public static final int DEFAULT_CONSUMER_THREAD_COUNT = 12; + public static final int DEFAULT_BROKER_SLEEP_MS = 1; + + private final String systemName; + private final Config config; + + public MockSystemConsumerConfig(String systemName, Config config) { + this.systemName = systemName; + this.config = config; + } + + /** + * @return the partition count to be used for MockSystemAdmin. + */ + public int getPartitionsPerStream() { + return config.getInt("systems." + systemName + ".partitions.per.stream", DEFAULT_PARTITION_COUNT); + } + + /** + * @return the messages per batch to be used for the MockSystemConsumer. + */ + public int getMessagesPerBatch() { + return config.getInt("systems." + systemName + ".messages.per.batch", DEFAULT_MESSAGES_PER_BATCH); + } + + /** + * @return the number of threads to be used for the MockSystemConsumer. + */ + public int getConsumerThreadCount() { + return config.getInt("systems." + systemName + ".consumer.thread.count", DEFAULT_CONSUMER_THREAD_COUNT); + } + + /** + * @return the milliseconds to sleep between each batch in + * MockSystemConsumer. + */ + public int getBrokerSleepMs() { + return config.getInt("systems." + systemName + ".broker.sleep.ms", DEFAULT_BROKER_SLEEP_MS); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 9602a52..55c24f8 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -228,7 +228,6 @@ class TestStatefulTask { "stores.mystore.msg.serde" -> "string", "stores.mystore.changelog" -> "kafka.mystore", "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", - "systems.kafka.samza.partition.manager" -> "org.apache.samza.stream.kafka.KafkaPartitionManager", "systems.kafka.consumer.zookeeper.connect" -> zkConnect, "systems.kafka.consumer.auto.offset.reset" -> "smallest", "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1), http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala new file mode 100644 index 0000000..e5a676e --- /dev/null +++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.performance + +import org.junit.Test +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.MessageCollector +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.job.local.LocalJobFactory +import org.apache.samza.config.MapConfig +import scala.collection.JavaConversions._ +import org.apache.samza.job.ShellCommandBuilder +import org.apache.samza.task.InitableTask +import org.apache.samza.task.TaskContext +import org.apache.samza.config.Config +import grizzled.slf4j.Logging + +/** + * A simple unit test that drives the TestPerformanceTask. This unit test can + * be triggered by itself using: + * + * <pre> + * ./gradlew :samza-test:test -Dtest.single=TestSamzaContainerPerformance + * <pre> + * + * Once the test is running, you can attach JConsole, VisualVM, or YourKit to + * have a look at how things are behaving. + * + * The test can be configured with the following system properties: + * + * <pre> + * samza.mock.consumer.thread.count + * samza.mock.messages.per.batch + * samza.mock.input.streams + * samza.mock.partitions.per.stream + * samza.mock.broker.sleep.ms + * samza.task.log.interval + * samza.task.max.messages + * <pre> + * + * For example, you might specify wish to process 10000 messages simulated + * from two input streams on one broker: + * + * <pre> + * ./gradlew :samza-test:test \ + * -Dsamza.test.single=TestSamzaContainerPerformance \ + * -Psamza.mock.input.streams=2 \ + * -Psamza.mock.consumer.thread.count=1 \ + * -Psamza.task.log.interval=1000 \ + * -Psamza.task.max.messages=10000 + * <pre> + */ +class TestSamzaContainerPerformance extends Logging{ + val consumerThreadCount = System.getProperty("samza.mock.consumer.thread.count", "12").toInt + val messagesPerBatch = System.getProperty("samza.mock.messages.per.batch", "5000").toInt + val streamCount = System.getProperty("samza.mock.input.streams", "1000").toInt + val partitionsPerStreamCount = System.getProperty("samza.mock.partitions.per.stream", "4").toInt + val brokerSleepMs = System.getProperty("samza.mock.broker.sleep.ms", "1").toInt + var logInterval = System.getProperty("samza.task.log.interval", "10000").toInt + var maxMessages = System.getProperty("samza.task.max.messages", "100000").toInt + + val jobConfig = Map( + "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory", + "job.name" -> "test-container-performance", + "task.class" -> classOf[TestPerformanceTask].getName, + "task.inputs" -> (0 until streamCount).map(i => "mock.stream" + i).mkString(","), + "task.log.interval" -> logInterval.toString, + "task.max.messages" -> maxMessages.toString, + "systems.mock.samza.factory" -> "org.apache.samza.system.mock.MockSystemFactory", + "systems.mock.partitions.per.stream" -> partitionsPerStreamCount.toString, + "systems.mock.messages.per.batch" -> messagesPerBatch.toString, + "systems.mock.consumer.thread.count" -> consumerThreadCount.toString, + "systems.mock.broker.sleep.ms" -> brokerSleepMs.toString) + + @Test + def testContainerPerformance { + info("Testing performance with configuration: %s" format jobConfig) + + val jobFactory = new LocalJobFactory + val job = jobFactory + .getJob(new MapConfig(jobConfig)) + .submit + + job.waitForFinish(Int.MaxValue) + } +} + +object TestPerformanceTask { + var messagesProcessed = 0 + var startTime = 0L +} + +/** + * A little test task that prints how many messages a SamzaContainer has + * received, and over what period of time. The messages-processed count is + * stored statically, so that all tasks in a single SamzaContainer increment + * the same counter. + * + * The log interval is configured with task.log.interval, which defines how many + * messages to process before printing a log line. The task will continue running + * until task.max.messages have been processed, at which point it will shut + * itself down. + */ +class TestPerformanceTask extends StreamTask with InitableTask with Logging { + import TestPerformanceTask._ + + /** + * How many messages to process before a log message is printed. + */ + var logInterval = 10000 + + /** + * How many messages to process before shutting down. + */ + var maxMessages = 100000 + + def init(config: Config, context: TaskContext) { + logInterval = config.getInt("task.log.interval", 10000) + maxMessages = config.getInt("task.max.messages", 100000) + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + if (startTime == 0) { + startTime = System.currentTimeMillis + } + + messagesProcessed += 1 + + if (messagesProcessed % logInterval == 0) { + val seconds = (System.currentTimeMillis - startTime) / 1000 + info("Processed %s messages in %s seconds." format (messagesProcessed, seconds)) + } + + if (messagesProcessed >= maxMessages) { + coordinator.shutdown + } + } +} \ No newline at end of file
