Repository: incubator-samza Updated Branches: refs/heads/master 5c65b03a4 -> 7cecf0aef
SAMZA-245; improve system consumers performance Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7cecf0ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7cecf0ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7cecf0ae Branch: refs/heads/master Commit: 7cecf0aef9fb968b0851746bdb01eaca1dc81050 Parents: 5c65b03 Author: Chris Riccomini <criccomi@criccomi-mn.(none)> Authored: Thu Jul 24 15:40:49 2014 -0700 Committer: Chris Riccomini <criccomi@criccomi-mn.(none)> Committed: Thu Jul 24 15:40:49 2014 -0700 ---------------------------------------------------------------------- .../0.7.0/jobs/configuration-table.html | 15 + .../org/apache/samza/system/SystemConsumer.java | 31 ++- .../system/SystemStreamPartitionIterator.java | 21 +- .../apache/samza/util/BlockingEnvelopeMap.java | 56 ++-- .../TestSystemStreamPartitionIterator.java | 22 +- .../samza/util/TestBlockingEnvelopeMap.java | 40 +-- .../org/apache/samza/config/TaskConfig.scala | 19 ++ .../apache/samza/container/SamzaContainer.scala | 13 +- .../apache/samza/system/SystemConsumers.scala | 272 ++++++++++--------- .../samza/system/SystemConsumersMetrics.scala | 12 +- .../chooser/BufferingMessageChooser.scala | 208 -------------- .../system/chooser/RoundRobinChooser.scala | 27 +- .../org/apache/samza/util/DoublingBackOff.scala | 62 ----- .../samza/system/TestSystemConsumers.scala | 181 ++++++++++-- .../chooser/TestBufferingMessageChooser.scala | 108 -------- .../TestFileReaderSystemConsumer.scala | 45 +-- .../apache/samza/util/TestDoublingBackOff.scala | 61 ----- .../test/performance/TestPerformanceTask.scala | 4 +- .../TestSamzaContainerPerformance.scala | 2 +- 19 files changed, 453 insertions(+), 746 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/docs/learn/documentation/0.7.0/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html index edcb74f..41f334f 100644 --- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html +++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html @@ -377,6 +377,21 @@ </tr> <tr> + <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td> + <td class="default"></td> + <td class="description"> + Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining + buffered messages to process for any input SystemStreamPartition. The second condition arises when some input + SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how + often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty + SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions + will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this + value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing + CPU and network utilization. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th> </tr> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java index 591f8fb..37c6c76 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java @@ -21,6 +21,7 @@ package org.apache.samza.system; import java.util.List; import java.util.Map; +import java.util.Set; /** * <p> @@ -137,35 +138,39 @@ public interface SystemConsumer { * Poll the SystemConsumer to get any available messages from the underlying * system. * - * <p>If the underlying implementation does not take care to adhere to the + * <p> + * If the underlying implementation does not take care to adhere to the * timeout parameter, the SamzaContainer's performance will suffer * drastically. Specifically, if poll blocks when it's not supposed to, it * will block the entire main thread in SamzaContainer, and no messages will * be processed while blocking is occurring. + * </p> * * @param systemStreamPartitions - * A map from SystemStreamPartition to maximum number of messages to - * return for the SystemStreamPartition. Polling with {stream1: 100, - * stream2: 1000} tells the SystemConsumer that it can return between - * 0 and 100 messages (inclusive) for stream1, and between 0 and 1000 - * messages for stream2. If SystemConsumer has messages available for - * other registered SystemStreamPartitions, but they are not in the - * systemStreamPartitions map in a given poll invocation, they can't + * A set of SystemStreamPartition to poll for new messages. If + * SystemConsumer has messages available for other registered + * SystemStreamPartitions, but they are not in the + * systemStreamPartitions set in a given poll invocation, they can't * be returned. It is illegal to pass in SystemStreamPartitions that * have not been registered with the SystemConsumer first. * @param timeout * If timeout < 0, poll will block unless all SystemStreamPartition * are at "head" (the underlying system has been checked, and - * returned an empty set). If at head, an empty list is returned. If + * returned an empty set). If at head, an empty map is returned. If * timeout >= 0, poll will return any messages that are currently * available for any of the SystemStreamPartitions specified. If no * new messages are available, it will wait up to timeout * milliseconds for messages from any SystemStreamPartition to become - * available. It will return an empty list if the timeout is hit, and + * available. It will return an empty map if the timeout is hit, and * no new messages are available. - * @return A list of zero or more IncomingMessageEnvelopes for the - * SystemStreamPartitions that were supplied during the invocation. + * @return A map from SystemStreamPartitions to any available + * IncomingMessageEnvelopes for the SystemStreamPartitions. If no + * messages are available for a SystemStreamPartition that was + * supplied in the polling set, the map will not contain a key for the + * SystemStreamPartition. Will return an empty map, not null, if no + * new messages are available for any SystemStreamPartitions in the + * input set. * @throws InterruptedException */ - List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) throws InterruptedException; + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java index 9acfb10..a8f858a 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java @@ -20,22 +20,25 @@ package org.apache.samza.system; import java.util.ArrayDeque; -import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.Set; import org.apache.samza.SamzaException; /** - * {@link java.util.Iterator} that wraps a {@link org.apache.samza.system.SystemConsumer} to iterate over - * the messages the consumer provides for the specified {@link org.apache.samza.system.SystemStreamPartition}. + * {@link java.util.Iterator} that wraps a + * {@link org.apache.samza.system.SystemConsumer} to iterate over the messages + * the consumer provides for the specified + * {@link org.apache.samza.system.SystemStreamPartition}. */ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEnvelope> { private final SystemConsumer systemConsumer; - private final Map<SystemStreamPartition, Integer> fetchMap; + private final Set<SystemStreamPartition> fetchSet; private Queue<IncomingMessageEnvelope> peeks; public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition) { @@ -44,8 +47,8 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, int fetchSize) { this.systemConsumer = systemConsumer; - this.fetchMap = new HashMap<SystemStreamPartition, Integer>(); - this.fetchMap.put(systemStreamPartition, fetchSize); + this.fetchSet = new HashSet<SystemStreamPartition>(); + this.fetchSet.add(systemStreamPartition); this.peeks = new ArrayDeque<IncomingMessageEnvelope>(); } @@ -74,10 +77,10 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn private void refresh() { if (peeks.size() == 0) { try { - List<IncomingMessageEnvelope> envelopes = systemConsumer.poll(fetchMap, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES); - if (envelopes != null && envelopes.size() > 0) { - peeks.addAll(envelopes); + for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) { + peeks.addAll(systemStreamPartitionEnvelopes); } } catch (InterruptedException e) { throw new SamzaException(e); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index 9503739..317e073 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -20,13 +20,16 @@ package org.apache.samza.util; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; + import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; @@ -39,8 +42,8 @@ import org.apache.samza.system.SystemStreamPartition; * BlockingEnvelopeMap is a helper class for SystemConsumer implementations. * Samza's poll() requirements make implementing SystemConsumers somewhat * tricky. BlockingEnvelopeMap is provided to help other developers write - * SystemConsumers. The intended audience is not those writing Samza jobs, - * but rather those extending Samza to consume from new types of stream providers + * SystemConsumers. The intended audience is not those writing Samza jobs, but + * rather those extending Samza to consume from new types of stream providers * and other systems. * </p> * @@ -97,55 +100,48 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return new LinkedBlockingQueue<IncomingMessageEnvelope>(); } - public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException { + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { long stopTime = clock.currentTimeMillis() + timeout; - List<IncomingMessageEnvelope> messagesToReturn = new ArrayList<IncomingMessageEnvelope>(); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); metrics.incPoll(); - for (Map.Entry<SystemStreamPartition, Integer> systemStreamPartitionAndMaxCount : systemStreamPartitionAndMaxPerStream.entrySet()) { - SystemStreamPartition systemStreamPartition = systemStreamPartitionAndMaxCount.getKey(); - Integer numMessages = systemStreamPartitionAndMaxCount.getValue(); + for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) { BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition); - IncomingMessageEnvelope envelope = null; - List<IncomingMessageEnvelope> systemStreamPartitionMessages = new ArrayList<IncomingMessageEnvelope>(); - - // First, drain all messages up to numMessages without blocking. - // Stop when we've filled the request (max numMessages), or when - // we get a null envelope back. - for (int i = 0; i < numMessages && (i == 0 || envelope != null); ++i) { - envelope = queue.poll(); + List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size()); - if (envelope != null) { - systemStreamPartitionMessages.add(envelope); - } - } + if (queue.size() > 0) { + queue.drainTo(outgoingList); + } else if (timeout != 0) { + IncomingMessageEnvelope envelope = null; - // Now block if blocking is allowed and we have no messages. - if (systemStreamPartitionMessages.size() == 0) { // How long we can legally block (if timeout > 0) long timeRemaining = stopTime - clock.currentTimeMillis(); if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) { - while (systemStreamPartitionMessages.size() < numMessages && !isAtHead(systemStreamPartition)) { + // Block until we get at least one message, or until we catch up to + // the head of the stream. + while (envelope == null && !isAtHead(systemStreamPartition)) { metrics.incBlockingPoll(systemStreamPartition); envelope = queue.poll(1000, TimeUnit.MILLISECONDS); - - if (envelope != null) { - systemStreamPartitionMessages.add(envelope); - } } } else if (timeout > 0 && timeRemaining > 0) { + // Block until we get at least one message. metrics.incBlockingTimeoutPoll(systemStreamPartition); envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS); + } - if (envelope != null) { - systemStreamPartitionMessages.add(envelope); - } + // If we got a message, add it. + if (envelope != null) { + outgoingList.add(envelope); + // Drain any remaining messages without blocking. + queue.drainTo(outgoingList); } } - messagesToReturn.addAll(systemStreamPartitionMessages); + if (outgoingList.size() > 0) { + messagesToReturn.put(systemStreamPartition, outgoingList); + } } return messagesToReturn; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java index 3ecabab..5af2a11 100644 --- a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java +++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java @@ -24,13 +24,15 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; - -import org.junit.Test; +import java.util.Queue; +import java.util.Set; import org.apache.samza.Partition; +import org.junit.Test; public class TestSystemStreamPartitionIterator { private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0)); @@ -105,14 +107,20 @@ public class TestSystemStreamPartitionIterator { } @Override - public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) { - List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>(); + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> systemStreamPartitionEnvelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); + + for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) { + List<IncomingMessageEnvelope> q = new ArrayList<IncomingMessageEnvelope>(); + + if (numPollReturnsWithMessages-- > 0) { + q.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages)); + } - if (numPollReturnsWithMessages-- > 0) { - list.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages)); + systemStreamPartitionEnvelopes.put(systemStreamPartition, q); } - return list; + return systemStreamPartitionEnvelopes; } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index cb4d148..35ba52d 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -19,26 +19,29 @@ package org.apache.samza.util; -import static org.junit.Assert.*; -import java.util.HashMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.junit.Test; import org.apache.samza.Partition; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; public class TestBlockingEnvelopeMap { private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0)); private static final IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(SSP, null, null, null); - private static final Map<SystemStreamPartition, Integer> FETCH = new HashMap<SystemStreamPartition, Integer>(); + private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>(); static { - FETCH.put(SSP, 10); + FETCH.add(SSP); } @Test @@ -65,33 +68,14 @@ public class TestBlockingEnvelopeMap { BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(); map.register(SSP, "0"); map.put(SSP, envelope); - List<IncomingMessageEnvelope> envelopes = map.poll(FETCH, 0); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0); assertEquals(1, envelopes.size()); + assertEquals(1, envelopes.get(SSP).size()); map.put(SSP, envelope); map.put(SSP, envelope); envelopes = map.poll(FETCH, 0); - assertEquals(2, envelopes.size()); - } - - @Test - public void testShouldNotReturnMoreEnvelopesThanAllowed() throws InterruptedException { - BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(); - int maxMessages = FETCH.get(SSP); - - map.register(SSP, "0"); - - for (int i = 0; i < 3 * maxMessages; ++i) { - map.put(SSP, envelope); - } - - assertEquals(3 * maxMessages, map.getNumMessagesInQueue(SSP)); - assertEquals(maxMessages, map.poll(FETCH, 0).size()); - assertEquals(2 * maxMessages, map.getNumMessagesInQueue(SSP)); - assertEquals(maxMessages, map.poll(FETCH, 30).size()); - assertEquals(maxMessages, map.getNumMessagesInQueue(SSP)); - assertEquals(maxMessages, map.poll(FETCH, 0).size()); - assertEquals(0, map.getNumMessagesInQueue(SSP)); - assertEquals(0, map.poll(FETCH, 0).size()); + assertEquals(1, envelopes.size()); + assertEquals(2, envelopes.get(SSP).size()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 8b881f2..21d8903 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -36,6 +36,23 @@ object TaskConfig { val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails + /** + * Samza's container polls for more messages under two conditions. The first + * condition arises when there are simply no remaining buffered messages to + * process for any input SystemStreamPartition. The second condition arises + * when some input SystemStreamPartitions have empty buffers, but some do + * not. In the latter case, a polling interval is defined to determine how + * often to refresh the empty SystemStreamPartition buffers. By default, + * this interval is 50ms, which means that any empty SystemStreamPartition + * buffer will be refreshed at least every 50ms. A higher value here means + * that empty SystemStreamPartitions will be refreshed less often, which + * means more latency is introduced, but less CPU and network will be used. + * Decreasing this value means that empty SystemStreamPartitions are + * refreshed more frequently, thereby introducing less latency, but + * increasing CPU and network utilization. + */ + val POLL_INTERVAL_MS = "task.poll.interval.ms" + implicit def Config2Task(config: Config) = new TaskConfig(config) } @@ -76,4 +93,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) { def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR) def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR) + + def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index bff6000..a7142b2 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -306,23 +306,28 @@ object SamzaContainer extends Logging { info("Got offset manager: %s" format offsetManager) - val dropDeserializationError: Boolean = config.getDropDeserialization match { + val dropDeserializationError = config.getDropDeserialization match { case Some(dropError) => dropError.toBoolean case _ => false } - val dropSerializationError: Boolean = config.getDropSerialization match { + val dropSerializationError = config.getDropSerialization match { case Some(dropError) => dropError.toBoolean case _ => false } + val pollIntervalMs = config + .getPollIntervalMs + .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString) + .toInt + val consumerMultiplexer = new SystemConsumers( - // TODO add config values for no new message timeout and max msgs per stream partition chooser = chooser, consumers = consumers, serdeManager = serdeManager, metrics = systemConsumersMetrics, - dropDeserializationError = dropDeserializationError) + dropDeserializationError = dropDeserializationError, + pollIntervalMs = pollIntervalMs) val producerMultiplexer = new SystemProducers( producers = producers, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 9eb70f2..fef7227 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -20,13 +20,21 @@ package org.apache.samza.system import scala.collection.JavaConversions._ -import scala.collection.mutable.Queue import org.apache.samza.serializers.SerdeManager import grizzled.slf4j.Logging import org.apache.samza.system.chooser.MessageChooser -import org.apache.samza.util.DoublingBackOff -import org.apache.samza.system.chooser.BufferingMessageChooser import org.apache.samza.SamzaException +import java.util.HashMap +import java.util.ArrayDeque +import java.util.Queue +import java.util.Set +import java.util.HashSet + +object SystemConsumers { + val DEFAULT_POLL_INTERVAL_MS = 50 + val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10 + val DEFAULT_DROP_SERIALIZATION_ERROR = false +} /** * The SystemConsumers class coordinates between all SystemConsumers, the @@ -59,98 +67,100 @@ class SystemConsumers( metrics: SystemConsumersMetrics = new SystemConsumersMetrics, /** - * The maximum number of messages to poll from a single SystemStreamPartition. - */ - maxMsgsPerStreamPartition: Int = 10000, - - /** * If MessageChooser returns null when it's polled, SystemConsumers will * poll each SystemConsumer with a timeout next time it tries to poll for * messages. Setting the timeout to 0 means that SamzaContainer's main * thread will sit in a tight loop polling every SystemConsumer over and * over again if no new messages are available. */ - noNewMessagesTimeout: Long = 10, + noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, /** - * This parameter is to define how to deal with deserialization failure. If set to true, - * the task will skip the messages when deserialization fails. If set to false, the task - * will throw SamzaException and fail the container. + * This parameter is to define how to deal with deserialization failure. If + * set to true, the task will skip the messages when deserialization fails. + * If set to false, the task will throw SamzaException and fail the container. */ - dropDeserializationError: Boolean = false) extends Logging { + dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, /** - * The buffer where SystemConsumers stores all incoming message envelopes. + * <p>Defines an upper bound for how long the SystemConsumers will wait + * before polling systems for more data. The default setting is 50ms, which + * means that SystemConsumers will poll for new messages for all + * SystemStreamPartitions with empty buffers every 50ms. SystemConsumers + * will also poll for new messages any time that there are no available + * messages to process, or any time the MessageChooser returns a null + * IncomingMessageEnvelope.</p> + * + * <p>This parameter also implicitly defines how much latency is introduced + * by SystemConsumers. If a message is available for a SystemStreamPartition + * with no remaining unprocessed messages, the SystemConsumers will poll for + * it within 50ms of its availability in the stream system.</p> */ - val buffer = new BufferingMessageChooser(chooser) + pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS, /** - * A map of every SystemStreamPartition that SystemConsumers is responsible - * for polling. The values are how many messages to poll for during the next - * SystemConsumers.poll call. - * - * If the value for a SystemStreamPartition is maxMsgsPerStreamPartition, - * then the implication is that SystemConsumers has no incoming messages in - * its buffer for the SystemStreamPartition. If the value is 0 then the - * SystemConsumers' buffer is full for the SystemStreamPartition. + * Clock can be used to inject a custom clock when mocking this class in + * tests. The default implementation returns the current system clock time. */ - var fetchMap = Map[SystemStreamPartition, java.lang.Integer]() + clock: () => Long = () => System.currentTimeMillis) extends Logging { /** - * A cache of fetchMap values, grouped according to the system. This is - * purely a trick to get better performance out of the SystemConsumsers - * class, since the map from systemName to its fetchMap is used for every - * poll call. + * A buffer of incoming messages grouped by SystemStreamPartition. These + * messages are handed out to the MessageChooser as it needs them. */ - var systemFetchMapCache = Map[String, Map[SystemStreamPartition, java.lang.Integer]]() + private val unprocessedMessagesBySSP = new HashMap[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() + + /** + * A set of SystemStreamPartitions grouped by systemName. This is used as a + * cache to figure out which SystemStreamPartitions we need to poll from the + * underlying system consumer. + */ + private val emptySystemStreamPartitionsBySystem = new HashMap[String, Set[SystemStreamPartition]]() /** * Default timeout to noNewMessagesTimeout. Every time SystemConsumers - * receives incoming messages, it sets timout to 0. Every time + * receives incoming messages, it sets timeout to 0. Every time * SystemConsumers receives no new incoming messages from the MessageChooser, * it sets timeout to noNewMessagesTimeout again. */ var timeout = noNewMessagesTimeout /** - * Make the maximum backoff proportional to the number of streams we're consuming. - * For a few streams, make the max back off 1, but for hundreds make it up to 1k, - * which experimentally has shown to be the most performant. + * The last time that systems were polled for new messages. */ - var maxBackOff = 0 + var lastPollMs = 0L /** - * How low totalUnprocessedMessages has to get before the consumers are - * polled for more data. This is defined to be 10% of - * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to - * 10000, the default refreshThreshold is 1000. + * Total number of unprocessed messages in unprocessedMessagesBySSP. */ - val refreshThreshold = maxMsgsPerStreamPartition * .1 + var totalUnprocessedMessages = 0 debug("Got stream consumers: %s" format consumers) - debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition) debug("Got no new message timeout: %s" format noNewMessagesTimeout) - metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size) - metrics.setNeededByChooser(() => buffer.neededByChooser.size) metrics.setTimeout(() => timeout) - metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition) - metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout) + metrics.setNeededByChooser(() => emptySystemStreamPartitionsBySystem.size) + metrics.setUnprocessedMessages(() => totalUnprocessedMessages) def start { debug("Starting consumers.") - maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt - - debug("Got maxBackOff: " + maxBackOff) + emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP + .keySet + .groupBy(_.getSystem) + .mapValues(systemStreamPartitions => new HashSet(systemStreamPartitions.toSeq)) consumers .keySet .foreach(metrics.registerSystem) - consumers.values.foreach(_.start) + consumers + .values + .foreach(_.start) + + chooser.start - buffer.start + refresh } def stop { @@ -158,15 +168,14 @@ class SystemConsumers( consumers.values.foreach(_.stop) - buffer.stop + chooser.stop } def register(systemStreamPartition: SystemStreamPartition, offset: String) { debug("Registering stream: %s, %s" format (systemStreamPartition, offset)) - metrics.registerSystemStream(systemStreamPartition.getSystemStream) - buffer.register(systemStreamPartition, offset) - updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition) + unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]()) + chooser.register(systemStreamPartition, offset) try { consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset) @@ -175,135 +184,128 @@ class SystemConsumers( } } - /** - * Needs to be be lazy so that we are sure to get the value of maxBackOff assigned - * in start(), rather than its initial value. - */ - lazy val refresh = new DoublingBackOff(maxBackOff) { - def call(): Boolean = { - debug("Refreshing chooser with new messages.") - - // Poll every system for new messages. - consumers.keys.map(poll(_)).contains(true) - } - } - def choose: IncomingMessageEnvelope = { - val envelopeFromChooser = buffer.choose + val envelopeFromChooser = chooser.choose if (envelopeFromChooser == null) { debug("Chooser returned null.") metrics.choseNull.inc - // Allow blocking if the chooser didn't choose a message. + // Sleep for a while so we don't poll in a tight loop. timeout = noNewMessagesTimeout } else { - debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) + val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition - metrics.choseObject.inc + debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) - // Don't block if we have a message to process. + // Ok to give the chooser a new message from this stream. timeout = 0 - + metrics.choseObject.inc metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc + + if (!update(systemStreamPartition)) { + emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition) + } } - // Always refresh if we got nothing from the chooser. Otherwise, just - // refresh when the buffer is low. - if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) { - refresh.maybeCall() + if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) { + refresh } - updateMessageChooser envelopeFromChooser } /** - * Poll a system for new messages from SystemStreamPartitions that have - * dipped below the depletedQueueSizeThreshold threshold. Return true if - * any envelopes were found, false if none. + * Poll all SystemStreamPartitions for which there are currently no new + * messages to process. */ - private def poll(systemName: String): Boolean = { + private def poll(systemName: String) { debug("Polling system consumer: %s" format systemName) metrics.systemPolls(systemName).inc - val consumer = consumers(systemName) - debug("Getting fetch map for system: %s" format systemName) - val systemFetchMap = systemFetchMapCache(systemName) + val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName) - debug("Fetching: %s" format systemFetchMap) + // Poll when at least one SSP in this system needs more messages. + if (systemFetchSet.size > 0) { + val consumer = consumers(systemName) - metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchMap.size) + debug("Fetching: %s" format systemFetchSet) - val incomingEnvelopes = consumer.poll(systemFetchMap, timeout) + metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size) - debug("Got incoming message envelopes: %s" format incomingEnvelopes) + val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, timeout) - metrics.systemMessagesPerPoll(systemName).inc + debug("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes) - // We have new un-processed envelopes, so update maps accordingly. - incomingEnvelopes.foreach(envelope => { - val systemStreamPartition = envelope.getSystemStreamPartition + metrics.systemMessagesPerPoll(systemName).inc - val messageEnvelope = try { - Some(serdeManager.fromBytes(envelope)) - } catch { - case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e) - case ex: Throwable => { - debug("Deserialization fails: %s . Drop the error message" format ex) - metrics.deserializationError.inc - None - } - } + val sspAndEnvelopeIterator = systemStreamPartitionEnvelopes.entrySet.iterator - if (!messageEnvelope.isEmpty) { - buffer.update(messageEnvelope.get) - } + while (sspAndEnvelopeIterator.hasNext) { + val sspAndEnvelope = sspAndEnvelopeIterator.next + val systemStreamPartition = sspAndEnvelope.getKey + val envelopes = new ArrayDeque(sspAndEnvelope.getValue) + val numEnvelopes = envelopes.size + totalUnprocessedMessages += numEnvelopes - debug("Got message for: %s, %s" format (systemStreamPartition, envelope)) + if (numEnvelopes > 0) { + unprocessedMessagesBySSP.put(systemStreamPartition, envelopes) - updateFetchMap(systemStreamPartition, -1) + // Update the chooser if it needs a message for this SSP. + if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) { + update(systemStreamPartition) + } + } + } + } else { + debug("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName)) + } + } - debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap)) - }) + private def refresh { + debug("Refreshing chooser with new messages.") - !incomingEnvelopes.isEmpty + // Update last poll time so we don't poll too frequently. + lastPollMs = clock() + + // Poll every system for new messages. + consumers.keys.map(poll(_)) } /** - * A helper method that updates both fetchMap and systemFetchMapCache - * simultaneously. This is a convenience method to make sure that the - * systemFetchMapCache stays in sync with fetchMap. + * Tries to update the message chooser with an envelope from the supplied + * SystemStreamPartition if an envelope is available. */ - private def updateFetchMap(systemStreamPartition: SystemStreamPartition, amount: Int = 1) { - val fetchSize = fetchMap.getOrElse(systemStreamPartition, java.lang.Integer.valueOf(0)).intValue + amount - val systemName = systemStreamPartition.getSystem - var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map()) + private def update(systemStreamPartition: SystemStreamPartition) = { + var updated = false + val q = unprocessedMessagesBySSP.get(systemStreamPartition) + + while (q.size > 0 && !updated) { + val rawEnvelope = q.remove + val deserializedEnvelope = try { + Some(serdeManager.fromBytes(rawEnvelope)) + } catch { + case e: Exception if !dropDeserializationError => + throw new SystemConsumersException("Cannot deserialize an incoming message.", e) + case ex: Exception => + debug("Cannot deserialize an incoming message. Dropping the error message.", ex) + metrics.deserializationError.inc + None + } - if (fetchSize >= refreshThreshold) { - systemFetchMap += systemStreamPartition -> fetchSize - } else { - systemFetchMap -= systemStreamPartition - } + if (deserializedEnvelope.isDefined) { + chooser.update(deserializedEnvelope.get) + updated = true + } - fetchMap += systemStreamPartition -> fetchSize - systemFetchMapCache += systemName -> systemFetchMap - } + totalUnprocessedMessages -= 1 + } - /** - * A helper method that updates MessageChooser. This should be called in - * "choose" method after we try to consume a message from MessageChooser. - */ - private def updateMessageChooser { - buffer - .flush - // Let the fetchMap know of any SSPs that were given to the chooser, so - // a new fetch can be triggered if the buffer is low. - .foreach(updateFetchMap(_)) + updated } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala index b065ae6..a63349c 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala @@ -33,10 +33,6 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]() val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]() - def setUnprocessedMessages(getValue: () => Int) { - newGauge("unprocessed-messages", getValue) - } - def setNeededByChooser(getValue: () => Int) { newGauge("ssps-needed-by-chooser", getValue) } @@ -45,12 +41,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry newGauge("poll-timeout", getValue) } - def setMaxMessagesPerStreamPartition(getValue: () => Int) { - newGauge("max-buffered-messages-per-stream-partition", getValue) - } - - def setNoNewMessagesTimeout(getValue: () => Long) { - newGauge("blocking-poll-timeout", getValue) + def setUnprocessedMessages(getValue: () => Int) { + newGauge("unprocessed-messages", getValue) } def registerSystem(systemName: String) { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala deleted file mode 100644 index c7ef6ef..0000000 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.chooser - -import scala.collection.mutable.Queue -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.IncomingMessageEnvelope - -/** - * This buffer is responsible for storing new unprocessed - * IncomingMessageEnvelopes, feeding the envelopes to the message chooser, - * and coordinating with the chooser to pick the next message to be processed. - */ -class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser { - import ChooserStatus._ - - /** - * A buffer of incoming messages grouped by SystemStreamPartition. - */ - var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() - - /** - * A count of all messages sitting in SystemConsumers' unprocessedMessages - * buffer, and inside the MessageChooser. - */ - var totalUnprocessedMessages = 0 - - /** - * A map that contains the current status for each SSP that's been - * registered to the coordinator. - */ - var statuses = Map[SystemStreamPartition, ChooserStatus]() - - /** - * This is a cache of all SSPs that are currently in the "NeededByChooser" - * state. It's simply here to improve performance, since it means we don't - * need to iterate over all SSPs in the statuses map in order to determine - * which SSPs are currently needed by the chooser. - */ - var neededByChooser = Set[SystemStreamPartition]() - - /** - * Start the chooser that this buffer is managing. - */ - def start = chooser.start - - /** - * Stop the chooser that this buffer is managing. - */ - def stop = chooser.stop - - /** - * Register a new SystemStreamPartition with this buffer, as well as the - * underlying chooser. - */ - def register(systemStreamPartition: SystemStreamPartition, offset: String) { - unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]() - chooser.register(systemStreamPartition, offset) - setStatus(systemStreamPartition, NeededByChooser) - } - - /** - * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer - * will hold on to this envelope, and eventually feed it to the underlying chooser. - */ - def update(envelope: IncomingMessageEnvelope) { - val systemStreamPartition = envelope.getSystemStreamPartition - - unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope) - totalUnprocessedMessages += 1 - - if (statuses(systemStreamPartition).equals(SkippingChooser)) { - // If we were skipping messages from this SystemStreamPartition, update - // neededByChooser since we've got messages for it now. - setStatus(systemStreamPartition, NeededByChooser) - } - } - - /** - * Tell the buffer to update the underlying chooser with any SSPs that the - * chooser needs. - * - * @return A set of SystemStreamPartitions that were updated in the chooser - * as a result of this method invocation. - */ - def flush: Set[SystemStreamPartition] = { - var updatedSystemStreamPartitions = Set[SystemStreamPartition]() - - neededByChooser.foreach(systemStreamPartition => { - if (unprocessedMessages(systemStreamPartition).size > 0) { - // If we have messages for a stream that the chooser needs, then update. - chooser.update(unprocessedMessages(systemStreamPartition).dequeue) - updatedSystemStreamPartitions += systemStreamPartition - setStatus(systemStreamPartition, InChooser) - } else { - // If we find that we have no messages for this SystemStreamPartition, - // rather than continue trying to update the chooser with this - // SystemStreamPartition, add it to the skip set and remove it from - // the neededByChooser set (see below). - setStatus(systemStreamPartition, SkippingChooser) - } - }) - - updatedSystemStreamPartitions - } - - /** - * Choose a message from the underlying chooser, and return it. - * - * @return The IncomingMessageEnvelope that the chooser has picked, or null - * if the chooser didn't pick anything. - */ - def choose = { - val envelope = chooser.choose - - if (envelope != null) { - setStatus(envelope.getSystemStreamPartition, NeededByChooser) - - // Chooser picked a message, so we've got one less unprocessed message. - totalUnprocessedMessages -= 1 - } - - envelope - } - - /** - * Update the status of a SystemStreamPartition. - */ - private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) { - statuses += systemStreamPartition -> status - - if (status.equals(NeededByChooser)) { - neededByChooser += systemStreamPartition - } else { - neededByChooser -= systemStreamPartition - } - } -} - -/** - * ChooserStatus denotes the current state of a SystemStreamPartition for a - * MessageChooser. This state is used to improve performance in the buffer. - * To update a MessageChooser, we first check if an envelope exists for each - * SSP that the buffer needs. If the buffer contains a lot of empty queues, - * then the operation of iterating over all needed SSPs, and discovering that - * their queues are empty is a waste of time, since they'll remain empty until - * a new envelope is added to the queue, which the buffer knows by virtue of - * having access to the enqueue method. Instead, we stop checking for an empty - * SSP (SkippingChooser), until a new envelope is added via the enqueue method. - */ -object ChooserStatus extends Enumeration { - type ChooserStatus = Value - - /** - * When an envelope has been updated for the MessageChooser, the - * SystemStreamPartition for the envelope should be set to the InChooser - * state. The state will remain this way until the MessageChooser returns an - * envelope with the same SystemStreamPartition, at which point, the - * SystemStreamPartition's state should be transitioned to NeededByChooser - * (see below). - */ - val InChooser = Value - - /** - * If a SystemStreamPartition is not in the InChooser state, and it's - * unclear if the buffer has more messages available for the SSP, the SSP - * should be in the NeededByChooser state. This state means that the chooser - * should be updated with a new message from the SSP, if one is available. - */ - val NeededByChooser = Value - - /** - * When a SystemStreamPartition is in the NeededByChooser state, and we try - * to update the message chooser with a new envelope from the buffer for the - * SSP, there are two potential outcomes. One is that there is an envelope - * in the buffer for the SSP. In this case, the state will be transitioned - * to InChooser. The other possibility is that there is no envelope in the - * buffer at the time the update is trying to occur. In the latter case, the - * SSP is transitioned to the SkippingChooser state. This state means that - * the buffer will cease to update the chooser with envelopes from this SSP - * until a new envelope for the SSP is added to the buffer again. - * - * <br/><br/> - * - * The reason that this state exists is purely to improve performance. If we - * simply leave SSPs in the NeededByChooser state, we will try and update the - * chooser on every updateChooser call for all SSPs. This is a waste of time - * if the buffer contains a large number of empty SSPs. - */ - val SkippingChooser = Value -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala index 5374121..4ecf1f2 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala @@ -26,7 +26,6 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.ReadableMetricsRegistry - import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.MetricsHelper @@ -42,13 +41,6 @@ import org.apache.samza.metrics.MetricsHelper class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChooserMetrics) extends BaseMessageChooser { /** - * SystemStreamPartitions that the chooser has received a message for, but - * have not yet returned. Envelopes for these SystemStreamPartitions should - * be in the queue. - */ - var inflightSystemStreamPartitions = Set[SystemStreamPartition]() - - /** * Queue of potential messages to process. Round robin will always choose * the message at the head of the queue. A queue can be used to implement * round robin here because we only get one envelope per @@ -61,29 +53,12 @@ class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChoose } def update(envelope: IncomingMessageEnvelope) = { - if (inflightSystemStreamPartitions.contains(envelope.getSystemStreamPartition)) { - throw new SamzaException("Received more than one envelope from the same " - + "SystemStreamPartition without returning the last. This is a " - + "violation of the contract with SystemConsumers, and breaks this " - + "RoundRobin implementation.") - } - q.add(envelope) - inflightSystemStreamPartitions += envelope.getSystemStreamPartition } - def choose = { - val envelope = q.poll - - if (envelope != null) { - inflightSystemStreamPartitions -= envelope.getSystemStreamPartition - } - - envelope - } + def choose = q.poll } - class RoundRobinChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { def setBufferedMessages(getValue: () => Int) { newGauge("buffered-messages", getValue) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala b/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala deleted file mode 100644 index e1d6d4c..0000000 --- a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.util - -/** - * Perform the provided action (via the call method) and, if it returns true, - * perform it again, the next time. If, however, call returns false, do not - * perform call the next time, instead wait 2*n calls before actually calling, - * with n increasing to the maximum specified in the constructor. - - * @param maxBackOff Absolute maximum number of calls to call before actually performing call. - */ -abstract class DoublingBackOff(maxBackOff:Int = 64) { - var invocationsBeforeCall = 0 - var currentBackOff = 0 - - /** - * Method to invoke and whose return value will determine the next time - * it is called again. - */ - def call():Boolean - - /** - * Possibly execute the call method, based on the result of the previous run. - */ - def maybeCall():Unit = { - if(invocationsBeforeCall == 0) { - if (call()) { - // call succeeded so reset backoff - currentBackOff = 0 - } else { - // Call failed, so start backing off - currentBackOff = scala.math.min(maxBackOff, nextBackOff(currentBackOff)) - invocationsBeforeCall = currentBackOff - } - } else { - invocationsBeforeCall -= 1 - } - - } - - // 2 * 0 == 0, making getting started a wee bit hard, so we need a little help with that first back off - private def nextBackOff(i:Int) = if(i == 0) 1 else 2 * i - -} - http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index 97e65eb..04229a6 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -27,39 +27,153 @@ import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.system.chooser.DefaultChooser import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.serializers._ +import org.apache.samza.system.chooser.MockMessageChooser class TestSystemConsumers { + def testPollIntervalMs { + val numEnvelopes = 1000 + val system = "test-system" + val systemStreamPartition0 = new SystemStreamPartition(system, "some-stream", new Partition(0)) + val systemStreamPartition1 = new SystemStreamPartition(system, "some-stream", new Partition(1)) + val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v") + val consumer = new CustomPollResponseSystemConsumer(envelope) + var now = 0L + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now) + + consumers.register(systemStreamPartition0, "0") + consumers.register(systemStreamPartition1, "1234") + consumers.start + + // Tell the consumer to respond with 1000 messages for SSP0, and no + // messages for SSP1. + consumer.setResponseSizes(numEnvelopes) + + // Choose to trigger a refresh with data. + assertNull(consumers.choose) + // 2: First on start, second on choose. + assertEquals(2, consumer.polls) + assertEquals(2, consumer.lastPoll.size) + assertTrue(consumer.lastPoll.contains(systemStreamPartition0)) + assertTrue(consumer.lastPoll.contains(systemStreamPartition1)) + assertEquals(envelope, consumers.choose) + assertEquals(envelope, consumers.choose) + // We aren't polling because we're getting non-null envelopes. + assertEquals(2, consumer.polls) + + // Advance the clock to trigger a new poll even though there are still + // messages. + now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS + + assertEquals(envelope, consumers.choose) + + // We polled even though there are still 997 messages in the unprocessed + // message buffer. + assertEquals(3, consumer.polls) + assertEquals(1, consumer.lastPoll.size) + + // Only SSP1 was polled because we still have messages for SSP2. + assertTrue(consumer.lastPoll.contains(systemStreamPartition1)) + + // Now drain all messages for SSP0. There should be exactly 997 messages, + // since we have chosen 3 already, and we started with 1000. + (0 until (numEnvelopes - 3)).foreach { i => + assertEquals(envelope, consumers.choose) + } + + // Nothing left. Should trigger a poll here. + assertNull(consumers.choose) + assertEquals(4, consumer.polls) + assertEquals(2, consumer.lastPoll.size) + + // Now we ask for messages from both again. + assertTrue(consumer.lastPoll.contains(systemStreamPartition0)) + assertTrue(consumer.lastPoll.contains(systemStreamPartition1)) + } + + def testBasicSystemConsumersFunctionality { + val system = "test-system" + val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) + val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v") + val consumer = new CustomPollResponseSystemConsumer(envelope) + var now = 0 + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now) + + consumers.register(systemStreamPartition, "0") + consumers.start + + // Start should trigger a poll to the consumer. + assertEquals(1, consumer.polls) + + // Tell the consumer to start returning messages when polled. + consumer.setResponseSizes(1) + + // Choose to trigger a refresh with data. + assertNull(consumers.choose) + + // Choose should have triggered a second poll, since no messages are available. + assertEquals(2, consumer.polls) + + // Choose a few times. This time there is no data. + assertEquals(envelope, consumers.choose) + assertNull(consumers.choose) + assertNull(consumers.choose) + + // Return more than one message this time. + consumer.setResponseSizes(2) + + // Choose to trigger a refresh with data. + assertNull(consumers.choose) + + // Increase clock interval. + now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS + + // We get two messages now. + assertEquals(envelope, consumers.choose) + // Should not poll even though clock interval increases past interval threshold. + assertEquals(2, consumer.polls) + assertEquals(envelope, consumers.choose) + assertNull(consumers.choose) + } + @Test def testSystemConumersShouldRegisterStartAndStopChooser { val system = "test-system" val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) - var started = 0 - var stopped = 0 - var registered = Map[SystemStreamPartition, String]() + var consumerStarted = 0 + var consumerStopped = 0 + var consumerRegistered = Map[SystemStreamPartition, String]() + var chooserStarted = 0 + var chooserStopped = 0 + var chooserRegistered = Map[SystemStreamPartition, String]() val consumer = Map(system -> new SystemConsumer { - def start {} - def stop {} - def register(systemStreamPartition: SystemStreamPartition, offset: String) {} - def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List() + def start = consumerStarted += 1 + def stop = consumerStopped += 1 + def register(systemStreamPartition: SystemStreamPartition, offset: String) = consumerRegistered += systemStreamPartition -> offset + def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]() }) val consumers = new SystemConsumers(new MessageChooser { def update(envelope: IncomingMessageEnvelope) = Unit def choose = null - def start = started += 1 - def stop = stopped += 1 - def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset + def start = chooserStarted += 1 + def stop = chooserStopped += 1 + def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset }, consumer, null) consumers.register(systemStreamPartition, "0") consumers.start consumers.stop - assertEquals(1, started) - assertEquals(1, stopped) - assertEquals(1, registered.size) - assertEquals("0", registered(systemStreamPartition)) + assertEquals(1, chooserStarted) + assertEquals(1, chooserStopped) + assertEquals(1, chooserRegistered.size) + assertEquals("0", chooserRegistered(systemStreamPartition)) + + assertEquals(1, consumerStarted) + assertEquals(1, consumerStopped) + assertEquals(1, consumerRegistered.size) + assertEquals("0", consumerRegistered(systemStreamPartition)) } @Test @@ -76,7 +190,7 @@ class TestSystemConsumers { def start {} def stop {} def register(systemStreamPartition: SystemStreamPartition, offset: String) {} - def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List() + def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]() }) val consumers = new SystemConsumers(new MessageChooser { def update(envelope: IncomingMessageEnvelope) = Unit @@ -102,16 +216,16 @@ class TestSystemConsumers { val system = "test-system" val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) val msgChooser = new DefaultChooser - val consumer = Map(system -> new SimpleConsumer) + val consumer = Map(system -> new SerializingConsumer) val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]); val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes) - // it should throw exceptions when the deserialization has error + // throw exceptions when the deserialization has error val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false) consumers.register(systemStreamPartition, "0") - consumers.start consumer(system).putBytesMessage consumer(system).putStringMessage + consumers.start var caughtRightException = false try { @@ -142,10 +256,35 @@ class TestSystemConsumers { } /** - * a simple consumer that provides two extra methods, one is to put bytes format message - * and the other to put string format message + * A simple MockSystemConsumer that keeps track of what was polled, and lets + * you define how many envelopes to return in the poll response. You can + * supply the envelope to use for poll responses through the constructor. + */ + private class CustomPollResponseSystemConsumer(envelope: IncomingMessageEnvelope) extends SystemConsumer { + var polls = 0 + var pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]() + var lastPoll: java.util.Set[SystemStreamPartition] = null + def start {} + def stop {} + def register(systemStreamPartition: SystemStreamPartition, offset: String) {} + def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = { + polls += 1 + lastPoll = systemStreamPartitions + pollResponse + } + def setResponseSizes(numEnvelopes: Int) { + val q = new java.util.ArrayList[IncomingMessageEnvelope]() + (0 until numEnvelopes).foreach { i => q.add(envelope) } + pollResponse = Map(envelope.getSystemStreamPartition -> q) + pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]() + } + } + + /** + * A simple consumer that provides two extra methods: one is to put bytes + * format message and the other to put string format message. */ - private class SimpleConsumer extends BlockingEnvelopeMap { + private class SerializingConsumer extends BlockingEnvelopeMap { val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1)) def putBytesMessage { put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes())) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala deleted file mode 100644 index c96c53b..0000000 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.chooser - -import org.apache.samza.system.chooser.ChooserStatus._ -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition -import org.junit.Assert._ -import org.junit.Test - -class TestBufferingMessageChooser { - @Test - def testShouldBufferAndFlush { - val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) - val chooser = new MockMessageChooser - val buffer = new BufferingMessageChooser(chooser) - val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null) - buffer.register(ssp1, "1") - assertEquals(1, chooser.registers.size) - assertEquals("1", chooser.registers.getOrElse(ssp1, "0")) - buffer.start - assertEquals(1, chooser.starts) - assertEquals(null, buffer.choose) - buffer.update(envelope1) - // Should buffer this update, rather than passing it to the wrapped chooser. - assertEquals(null, buffer.choose) - buffer.flush - assertEquals(envelope1, buffer.choose) - assertEquals(null, buffer.choose) - buffer.stop - assertEquals(1, chooser.stops) - } - - @Test - def testBufferShouldSkipCheckedSSPs { - val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) - val chooser = new MockMessageChooser - val buffer = new BufferingMessageChooser(chooser) - val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null) - buffer.register(ssp1, "1") - buffer.start - checkChooserStatus(NeededByChooser, ssp1, buffer) - - // Buffer first message. Still needed. - buffer.update(envelope1) - checkChooserStatus(NeededByChooser, ssp1, buffer) - assertEquals(null, buffer.choose) - checkChooserStatus(NeededByChooser, ssp1, buffer) - - // Flush first message. Now in chooser. - buffer.flush - checkChooserStatus(InChooser, ssp1, buffer) - assertEquals(envelope1, buffer.choose) - checkChooserStatus(NeededByChooser, ssp1, buffer) - - // Now flush with no messages. Should begin skipping chooser since no - // messages are available. - assertEquals(null, buffer.choose) - checkChooserStatus(NeededByChooser, ssp1, buffer) - buffer.flush - checkChooserStatus(SkippingChooser, ssp1, buffer) - - // Now check that we can get back to NeededByChooser when a new message - // arrives. - buffer.update(envelope1) - checkChooserStatus(NeededByChooser, ssp1, buffer) - assertEquals(0, chooser.envelopes.size) - assertEquals(null, buffer.choose) - - // And check that we can get back into the InChooser state. - buffer.flush - checkChooserStatus(InChooser, ssp1, buffer) - assertEquals(envelope1, buffer.choose) - checkChooserStatus(NeededByChooser, ssp1, buffer) - - // Shutdown. - buffer.stop - assertEquals(1, chooser.stops) - } - - private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) { - if (status.equals(NeededByChooser)) { - assertEquals(Set(systemStreamPartition), buffer.neededByChooser) - } else { - assertTrue(!buffer.neededByChooser.contains(systemStreamPartition)) - } - - assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala index b2e04a7..d31c3ce 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala @@ -88,39 +88,42 @@ class TestFileReaderSystemConsumer { consumer.start Thread.sleep(500) - val number: Integer = 1000 - val ssp1Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp1 -> number) - val ssp2Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp2 -> number) - val ssp3Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp3 -> number) - val ssp4Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp4 -> number) - val ssp5Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp5 -> number) - - val ssp1Result = consumer.poll(ssp1Number, 1000) - val ssp2Result = consumer.poll(ssp2Number, 1000) - val ssp3Result = consumer.poll(ssp3Number, 1000) - val ssp4Result = consumer.poll(ssp4Number, 1000) + val ssp1Result = consumer.poll(Set(ssp1), 1000) + val ssp2Result = consumer.poll(Set(ssp2), 1000) + val ssp3Result = consumer.poll(Set(ssp3), 1000) + val ssp4Result = consumer.poll(Set(ssp4), 1000) assertEquals(0, ssp1Result.size) assertEquals(0, ssp2Result.size) assertEquals(1, ssp3Result.size) - assertEquals("first line ", ssp3Result(0).getMessage) - assertEquals("0", ssp3Result(0).getOffset) + assertEquals(1, ssp3Result.get(ssp3).size) + var envelope = ssp3Result.get(ssp3).remove(0) + assertEquals("first line ", envelope.getMessage) + assertEquals("0", envelope.getOffset) assertEquals(1, ssp4Result.size) - assertEquals("second line ", ssp4Result(0).getMessage) - assertEquals("12", ssp4Result(0).getOffset) + assertEquals(1, ssp4Result.get(ssp4).size) + envelope = ssp4Result.get(ssp4).remove(0) + assertEquals("second line ", envelope.getMessage) + assertEquals("12", envelope.getOffset) appendFile Thread.sleep(1000) // ssp5 should read the new lines - val ssp5Result = consumer.poll(ssp5Number, 1000) - assertEquals(3, ssp5Result.size) - assertEquals("This is a new line", ssp5Result(2).getMessage) - assertEquals("50", ssp5Result(2).getOffset) - assertEquals("other lines ", ssp5Result(1).getMessage) - assertEquals("37", ssp5Result(1).getOffset) + val ssp5Result = consumer.poll(Set(ssp5), 1000) + assertEquals(1, ssp5Result.size) + assertEquals(3, ssp5Result.get(ssp5).size) + envelope = ssp5Result.get(ssp5).remove(0) + assertEquals("third line ", envelope.getMessage) + assertEquals("25", envelope.getOffset) + envelope = ssp5Result.get(ssp5).remove(0) + assertEquals("other lines ", envelope.getMessage) + assertEquals("37", envelope.getOffset) + envelope = ssp5Result.get(ssp5).remove(0) + assertEquals("This is a new line", envelope.getMessage) + assertEquals("50", envelope.getOffset) consumer.stop } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala deleted file mode 100644 index eaeb005..0000000 --- a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.util - -import org.junit.Test -import org.junit.Assert._ - -class TestDoublingBackOff { - @Test def zeroBackOffWorks() { - var counter = 0 - val zeroBackOff = new DoublingBackOff(0) { - def call(): Boolean = { - counter += 1 - true - } - } - - for(i <- 0 to 1000) { - assertEquals(i, counter) - zeroBackOff.maybeCall() - } - } - - @Test def backOffWorks() { - val toReturn = List(true, false, true, true) - var counter = 0 - val ebo = new DoublingBackOff() { - def call(): Boolean = { - counter += 1 - toReturn(counter - 1) - } - } - - ebo.maybeCall() // will get back true - assertEquals(1, counter) - ebo.maybeCall() // will get back false - assertEquals(2, counter) - ebo.maybeCall() // last false means we won't actually call, will hold off for one iteration - assertEquals(2, counter) - ebo.maybeCall() // and call on this one, which gives back true - assertEquals(3, counter) - ebo.maybeCall() // so we immediately call again and increment - assertEquals(4, counter) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala index 23d122e..1661b43 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala @@ -74,7 +74,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging { /** * How many messages to process before shutting down. */ - var maxMessages = 100000 + var maxMessages = 10000000 /** * If defined, incoming messages will be forwarded to this SystemStream. If @@ -84,7 +84,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging { def init(config: Config, context: TaskContext) { logInterval = config.getInt("task.log.interval", 10000) - maxMessages = config.getInt("task.max.messages", 100000) + maxMessages = config.getInt("task.max.messages", 10000000) outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_)) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala index 4016768..1f4c247 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala @@ -75,7 +75,7 @@ class TestSamzaContainerPerformance extends Logging{ val partitionsPerStreamCount = System.getProperty("samza.mock.partitions.per.stream", "4").toInt val brokerSleepMs = System.getProperty("samza.mock.broker.sleep.ms", "1").toInt var logInterval = System.getProperty("samza.task.log.interval", "10000").toInt - var maxMessages = System.getProperty("samza.task.max.messages", "100000").toInt + var maxMessages = System.getProperty("samza.task.max.messages", "10000000").toInt val jobConfig = Map( "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",
