Repository: incubator-samza Updated Branches: refs/heads/master 73d604c43 -> 5c5a95c33
SAMZA-220; make systemconsumers faster when consuming from a loarge number of partitions. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5c5a95c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5c5a95c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5c5a95c3 Branch: refs/heads/master Commit: 5c5a95c33b4e254108c25eecd6bf0a7786bd4d4f Parents: 73d604c Author: Chris Riccomini <[email protected]> Authored: Tue Apr 22 14:18:54 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Apr 22 14:18:54 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/system/SystemConsumers.scala | 102 ++++----- .../chooser/BufferingMessageChooser.scala | 208 +++++++++++++++++++ .../chooser/TestBufferingMessageChooser.scala | 108 ++++++++++ 3 files changed, 353 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index bbbacb5..7624aef 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -25,6 +25,7 @@ import org.apache.samza.serializers.SerdeManager import grizzled.slf4j.Logging import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.util.DoublingBackOff +import org.apache.samza.system.chooser.BufferingMessageChooser /** * The SystemConsumers class coordinates between all SystemConsumers, the @@ -59,16 +60,7 @@ class SystemConsumers( /** * The maximum number of messages to poll from a single SystemStreamPartition. */ - maxMsgsPerStreamPartition: Int = 1000, - - /** - * A percentage threshold that determines when a SystemStreamPartition - * should be polled again. 0.0 means poll for more messages only when - * SystemConsumer's buffer is totally empty. 0.2 means poll for more messages - * when SystemConsumers' buffer is 80% empty. SystemConsumers' buffer size - * is determined by maxMsgsPerStreamPartition. - */ - fetchThresholdPct: Float = 0f, + maxMsgsPerStreamPartition: Int = 10000, /** * If MessageChooser returns null when it's polled, SystemConsumers will @@ -80,18 +72,9 @@ class SystemConsumers( noNewMessagesTimeout: Long = 10) extends Logging { /** - * A buffer of incoming messages grouped by SystemStreamPartition. - */ - var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() - - /** - * The MessageChooser only gets updated with one message-per-SystemStreamPartition - * at a time. The MessageChooser will not receive a second message from the - * same SystemStreamPartition until the first message that it received has - * been returned to SystemConsumers. This set keeps track of which - * SystemStreamPartitions are valid to give to the MessageChooser. + * The buffer where SystemConsumers stores all incoming message envelopes. */ - var neededByChooser = Set[SystemStreamPartition]() + val buffer = new BufferingMessageChooser(chooser) /** * A map of every SystemStreamPartition that SystemConsumers is responsible @@ -122,33 +105,26 @@ class SystemConsumers( var timeout = noNewMessagesTimeout /** - * Used to determine when the next poll should take place for a given - * SystemStreamPartition. SystemConsumers inspects the value of fetchMap for each - * SystemStreamPartition, and decides to poll for the SystemStreamPartition - * if the fetchMap value is greater than or equal to the - * depletedQueueSizeThreshold. For example, suppose the fetchThresholdPct is - * 0.2, and the maxMsgsPerStreamPartition is 1000. This would result in - * depletedQueueSizeThreshold being 800. This a SystemStreamPartition with a - * fetchMap value of 936 (164 messages in the buffer is less than 20% of - * 1000) would be polled for more messages, while a SystemStream partition - * with a fetchMap value of 548 would not be polled for more messages (452 - * messages in the buffer is greater than 20% of 1000). - */ - val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - fetchThresholdPct)).toInt - - /** * Make the maximum backoff proportional to the number of streams we're consuming. * For a few streams, make the max back off 1, but for hundreds make it up to 1k, * which experimentally has shown to be the most performant. */ var maxBackOff = 0 - + + /** + * How low totalUnprocessedMessages has to get before the consumers are + * polled for more data. This is defined to be 10% of + * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to + * 10000, the default refreshThreshold is 1000. + */ + val refreshThreshold = maxMsgsPerStreamPartition * .1 + debug("Got stream consumers: %s" format consumers) debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition) debug("Got no new message timeout: %s" format noNewMessagesTimeout) - metrics.setUnprocessedMessages(() => fetchMap.values.map(maxMsgsPerStreamPartition - _.intValue).sum) - metrics.setNeededByChooser(() => neededByChooser.size) + metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size) + metrics.setNeededByChooser(() => buffer.neededByChooser.size) metrics.setTimeout(() => timeout) metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition) metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout) @@ -159,14 +135,14 @@ class SystemConsumers( maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt debug("Got maxBackOff: " + maxBackOff) - + consumers .keySet .foreach(metrics.registerSystem) consumers.values.foreach(_.start) - chooser.start + buffer.start } def stop { @@ -174,18 +150,16 @@ class SystemConsumers( consumers.values.foreach(_.stop) - chooser.stop + buffer.stop } def register(systemStreamPartition: SystemStreamPartition, offset: String) { debug("Registering stream: %s, %s" format (systemStreamPartition, offset)) metrics.registerSystemStream(systemStreamPartition.getSystemStream) - neededByChooser += systemStreamPartition + buffer.register(systemStreamPartition, offset) updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition) - unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]() consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset) - chooser.register(systemStreamPartition, offset) } /** @@ -202,7 +176,7 @@ class SystemConsumers( } def choose: IncomingMessageEnvelope = { - val envelopeFromChooser = chooser.choose + val envelopeFromChooser = buffer.choose if (envelopeFromChooser == null) { debug("Chooser returned null.") @@ -219,17 +193,19 @@ class SystemConsumers( // Don't block if we have a message to process. timeout = 0 - // Ok to give the chooser a new message from this stream. - neededByChooser += envelopeFromChooser.getSystemStreamPartition - metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc } - refresh.maybeCall() + // Always refresh if we got nothing from the chooser. Otherwise, just + // refresh when the buffer is low. + if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) { + refresh.maybeCall() + } + updateMessageChooser envelopeFromChooser } - + /** * Poll a system for new messages from SystemStreamPartitions that have * dipped below the depletedQueueSizeThreshold threshold. Return true if @@ -260,15 +236,13 @@ class SystemConsumers( incomingEnvelopes.foreach(envelope => { val systemStreamPartition = envelope.getSystemStreamPartition + buffer.update(serdeManager.fromBytes(envelope)) + debug("Got message for: %s, %s" format (systemStreamPartition, envelope)) updateFetchMap(systemStreamPartition, -1) debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap)) - - unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope)) - - debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages)) }) !incomingEnvelopes.isEmpty @@ -284,7 +258,7 @@ class SystemConsumers( val systemName = systemStreamPartition.getSystem var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map()) - if (fetchSize >= depletedQueueSizeThreshold) { + if (fetchSize >= refreshThreshold) { systemFetchMap += systemStreamPartition -> fetchSize } else { systemFetchMap -= systemStreamPartition @@ -293,18 +267,16 @@ class SystemConsumers( fetchMap += systemStreamPartition -> fetchSize systemFetchMapCache += systemName -> systemFetchMap } - + /** - * A helper method that updates MessageChooser. This should be called in + * A helper method that updates MessageChooser. This should be called in * "choose" method after we try to consume a message from MessageChooser. */ private def updateMessageChooser { - neededByChooser.foreach(systemStreamPartition => - // If we have messages for a stream that the chooser needs, then update. - if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) { - chooser.update(unprocessedMessages(systemStreamPartition).dequeue) - updateFetchMap(systemStreamPartition) - neededByChooser -= systemStreamPartition - }) + buffer + .flush + // Let the fetchMap know of any SSPs that were given to the chooser, so + // a new fetch can be triggered if the buffer is low. + .foreach(updateFetchMap(_)) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala new file mode 100644 index 0000000..c7ef6ef --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import scala.collection.mutable.Queue +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.IncomingMessageEnvelope + +/** + * This buffer is responsible for storing new unprocessed + * IncomingMessageEnvelopes, feeding the envelopes to the message chooser, + * and coordinating with the chooser to pick the next message to be processed. + */ +class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser { + import ChooserStatus._ + + /** + * A buffer of incoming messages grouped by SystemStreamPartition. + */ + var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() + + /** + * A count of all messages sitting in SystemConsumers' unprocessedMessages + * buffer, and inside the MessageChooser. + */ + var totalUnprocessedMessages = 0 + + /** + * A map that contains the current status for each SSP that's been + * registered to the coordinator. + */ + var statuses = Map[SystemStreamPartition, ChooserStatus]() + + /** + * This is a cache of all SSPs that are currently in the "NeededByChooser" + * state. It's simply here to improve performance, since it means we don't + * need to iterate over all SSPs in the statuses map in order to determine + * which SSPs are currently needed by the chooser. + */ + var neededByChooser = Set[SystemStreamPartition]() + + /** + * Start the chooser that this buffer is managing. + */ + def start = chooser.start + + /** + * Stop the chooser that this buffer is managing. + */ + def stop = chooser.stop + + /** + * Register a new SystemStreamPartition with this buffer, as well as the + * underlying chooser. + */ + def register(systemStreamPartition: SystemStreamPartition, offset: String) { + unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]() + chooser.register(systemStreamPartition, offset) + setStatus(systemStreamPartition, NeededByChooser) + } + + /** + * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer + * will hold on to this envelope, and eventually feed it to the underlying chooser. + */ + def update(envelope: IncomingMessageEnvelope) { + val systemStreamPartition = envelope.getSystemStreamPartition + + unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope) + totalUnprocessedMessages += 1 + + if (statuses(systemStreamPartition).equals(SkippingChooser)) { + // If we were skipping messages from this SystemStreamPartition, update + // neededByChooser since we've got messages for it now. + setStatus(systemStreamPartition, NeededByChooser) + } + } + + /** + * Tell the buffer to update the underlying chooser with any SSPs that the + * chooser needs. + * + * @return A set of SystemStreamPartitions that were updated in the chooser + * as a result of this method invocation. + */ + def flush: Set[SystemStreamPartition] = { + var updatedSystemStreamPartitions = Set[SystemStreamPartition]() + + neededByChooser.foreach(systemStreamPartition => { + if (unprocessedMessages(systemStreamPartition).size > 0) { + // If we have messages for a stream that the chooser needs, then update. + chooser.update(unprocessedMessages(systemStreamPartition).dequeue) + updatedSystemStreamPartitions += systemStreamPartition + setStatus(systemStreamPartition, InChooser) + } else { + // If we find that we have no messages for this SystemStreamPartition, + // rather than continue trying to update the chooser with this + // SystemStreamPartition, add it to the skip set and remove it from + // the neededByChooser set (see below). + setStatus(systemStreamPartition, SkippingChooser) + } + }) + + updatedSystemStreamPartitions + } + + /** + * Choose a message from the underlying chooser, and return it. + * + * @return The IncomingMessageEnvelope that the chooser has picked, or null + * if the chooser didn't pick anything. + */ + def choose = { + val envelope = chooser.choose + + if (envelope != null) { + setStatus(envelope.getSystemStreamPartition, NeededByChooser) + + // Chooser picked a message, so we've got one less unprocessed message. + totalUnprocessedMessages -= 1 + } + + envelope + } + + /** + * Update the status of a SystemStreamPartition. + */ + private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) { + statuses += systemStreamPartition -> status + + if (status.equals(NeededByChooser)) { + neededByChooser += systemStreamPartition + } else { + neededByChooser -= systemStreamPartition + } + } +} + +/** + * ChooserStatus denotes the current state of a SystemStreamPartition for a + * MessageChooser. This state is used to improve performance in the buffer. + * To update a MessageChooser, we first check if an envelope exists for each + * SSP that the buffer needs. If the buffer contains a lot of empty queues, + * then the operation of iterating over all needed SSPs, and discovering that + * their queues are empty is a waste of time, since they'll remain empty until + * a new envelope is added to the queue, which the buffer knows by virtue of + * having access to the enqueue method. Instead, we stop checking for an empty + * SSP (SkippingChooser), until a new envelope is added via the enqueue method. + */ +object ChooserStatus extends Enumeration { + type ChooserStatus = Value + + /** + * When an envelope has been updated for the MessageChooser, the + * SystemStreamPartition for the envelope should be set to the InChooser + * state. The state will remain this way until the MessageChooser returns an + * envelope with the same SystemStreamPartition, at which point, the + * SystemStreamPartition's state should be transitioned to NeededByChooser + * (see below). + */ + val InChooser = Value + + /** + * If a SystemStreamPartition is not in the InChooser state, and it's + * unclear if the buffer has more messages available for the SSP, the SSP + * should be in the NeededByChooser state. This state means that the chooser + * should be updated with a new message from the SSP, if one is available. + */ + val NeededByChooser = Value + + /** + * When a SystemStreamPartition is in the NeededByChooser state, and we try + * to update the message chooser with a new envelope from the buffer for the + * SSP, there are two potential outcomes. One is that there is an envelope + * in the buffer for the SSP. In this case, the state will be transitioned + * to InChooser. The other possibility is that there is no envelope in the + * buffer at the time the update is trying to occur. In the latter case, the + * SSP is transitioned to the SkippingChooser state. This state means that + * the buffer will cease to update the chooser with envelopes from this SSP + * until a new envelope for the SSP is added to the buffer again. + * + * <br/><br/> + * + * The reason that this state exists is purely to improve performance. If we + * simply leave SSPs in the NeededByChooser state, we will try and update the + * chooser on every updateChooser call for all SSPs. This is a waste of time + * if the buffer contains a large number of empty SSPs. + */ + val SkippingChooser = Value +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala new file mode 100644 index 0000000..c96c53b --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import org.apache.samza.system.chooser.ChooserStatus._ +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.junit.Assert._ +import org.junit.Test + +class TestBufferingMessageChooser { + @Test + def testShouldBufferAndFlush { + val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) + val chooser = new MockMessageChooser + val buffer = new BufferingMessageChooser(chooser) + val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null) + buffer.register(ssp1, "1") + assertEquals(1, chooser.registers.size) + assertEquals("1", chooser.registers.getOrElse(ssp1, "0")) + buffer.start + assertEquals(1, chooser.starts) + assertEquals(null, buffer.choose) + buffer.update(envelope1) + // Should buffer this update, rather than passing it to the wrapped chooser. + assertEquals(null, buffer.choose) + buffer.flush + assertEquals(envelope1, buffer.choose) + assertEquals(null, buffer.choose) + buffer.stop + assertEquals(1, chooser.stops) + } + + @Test + def testBufferShouldSkipCheckedSSPs { + val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) + val chooser = new MockMessageChooser + val buffer = new BufferingMessageChooser(chooser) + val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null) + buffer.register(ssp1, "1") + buffer.start + checkChooserStatus(NeededByChooser, ssp1, buffer) + + // Buffer first message. Still needed. + buffer.update(envelope1) + checkChooserStatus(NeededByChooser, ssp1, buffer) + assertEquals(null, buffer.choose) + checkChooserStatus(NeededByChooser, ssp1, buffer) + + // Flush first message. Now in chooser. + buffer.flush + checkChooserStatus(InChooser, ssp1, buffer) + assertEquals(envelope1, buffer.choose) + checkChooserStatus(NeededByChooser, ssp1, buffer) + + // Now flush with no messages. Should begin skipping chooser since no + // messages are available. + assertEquals(null, buffer.choose) + checkChooserStatus(NeededByChooser, ssp1, buffer) + buffer.flush + checkChooserStatus(SkippingChooser, ssp1, buffer) + + // Now check that we can get back to NeededByChooser when a new message + // arrives. + buffer.update(envelope1) + checkChooserStatus(NeededByChooser, ssp1, buffer) + assertEquals(0, chooser.envelopes.size) + assertEquals(null, buffer.choose) + + // And check that we can get back into the InChooser state. + buffer.flush + checkChooserStatus(InChooser, ssp1, buffer) + assertEquals(envelope1, buffer.choose) + checkChooserStatus(NeededByChooser, ssp1, buffer) + + // Shutdown. + buffer.stop + assertEquals(1, chooser.stops) + } + + private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) { + if (status.equals(NeededByChooser)) { + assertEquals(Set(systemStreamPartition), buffer.neededByChooser) + } else { + assertTrue(!buffer.neededByChooser.contains(systemStreamPartition)) + } + + assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null)) + } +} \ No newline at end of file
