SAMZA-775: add consumer size-based fetch threshold
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72a558ce Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72a558ce Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72a558ce Branch: refs/heads/samza-sql Commit: 72a558cebe50d98b5ef6a566b6cd8607b7032d96 Parents: acd340e Author: Monal Daxini <[email protected]> Authored: Tue Nov 24 15:50:35 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Nov 24 15:50:35 2015 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 25 +++++++ .../samza/system/IncomingMessageEnvelope.java | 19 +++++ .../apache/samza/util/BlockingEnvelopeMap.java | 66 +++++++++++++++-- .../samza/util/TestBlockingEnvelopeMap.java | 46 ++++++++++-- .../org/apache/samza/config/KafkaConfig.scala | 12 ++++ .../system/kafka/KafkaSystemConsumer.scala | 76 +++++++++++++++++--- .../samza/system/kafka/KafkaSystemFactory.scala | 3 + .../apache/samza/config/TestKafkaConfig.scala | 10 +++ .../system/kafka/TestKafkaSystemConsumer.scala | 70 ++++++++++++++++++ 9 files changed, 308 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 09f2b6f..96fdcc0 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -882,6 +882,31 @@ </tr> <tr> + <td class="property" id="systems-samza-fetch-threshold-bytes">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold.bytes</td> + <td class="default">-1</td> + <td class="description"> + When consuming streams from Kafka, a Samza container maintains an in-memory buffer + for incoming messages in order to increase throughput (the stream task can continue + processing buffered messages while new messages are fetched from Kafka). This + parameter determines the total size of messages we aim to buffer across all stream + partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered + prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this. + This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes + limit + size of max message in the partition for a given stream. If the value of this property is > 0 + then this takes precedence over systems.<span class="system">system-name</span>.samza.fetch.threshold.<br> + For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered, + then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage + can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops + below 1000, a fetch request will be executed to get more data for it. + + Increasing this parameter will decrease the latency between when a queue is drained of messages and when new + messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory. + + The default value is -1, which means this is not used. + </td> + </tr> + + <tr> <td class="property" id="task-checkpoint-system">task.checkpoint.system</td> <td class="default"></td> <td class="description"> http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 4b14312..cc860cf 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -28,6 +28,7 @@ public class IncomingMessageEnvelope { private final String offset; private final Object key; private final Object message; + private final int size; /** * Constructs a new IncomingMessageEnvelope from specified components. @@ -38,10 +39,24 @@ public class IncomingMessageEnvelope { * @param message A deserialized message received from the partition offset. */ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message) { + this(systemStreamPartition, offset, key, message, 0); + } + + /** + * Constructs a new IncomingMessageEnvelope from specified components. + * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster + * from which the stream came, and the partition of the stream from which the message was received. + * @param offset The offset in the partition that the message was received from. + * @param key A deserialized key received from the partition offset. + * @param message A deserialized message received from the partition offset. + * @param size size of the message and key in bytes. + */ + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) { this.systemStreamPartition = systemStreamPartition; this.offset = offset; this.key = key; this.message = message; + this.size = size; } public SystemStreamPartition getSystemStreamPartition() { @@ -60,6 +75,10 @@ public class IncomingMessageEnvelope { return message; } + public int getSize() { + return size; + } + @Override public int hashCode() { final int prime = 31; http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index 7dd99fb..8238d2e 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -37,6 +37,8 @@ import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemStreamPartition; +import java.util.concurrent.atomic.AtomicLong; + /** * <p> * BlockingEnvelopeMap is a helper class for SystemConsumer implementations. @@ -63,17 +65,15 @@ import org.apache.samza.system.SystemStreamPartition; public abstract class BlockingEnvelopeMap implements SystemConsumer { private final BlockingEnvelopeMapMetrics metrics; private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages; + private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition private final Map<SystemStreamPartition, Boolean> noMoreMessage; private final Clock clock; + protected final boolean fetchLimitByBytesEnabled; public BlockingEnvelopeMap() { this(new NoOpMetricsRegistry()); } - public BlockingEnvelopeMap(Clock clock) { - this(new NoOpMetricsRegistry(), clock); - } - public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) { this(metricsRegistry, new Clock() { public long currentTimeMillis() { @@ -83,15 +83,18 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) { - this(metricsRegistry, clock, null); + this(metricsRegistry, clock, null, false); } - public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) { + public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) { metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName; this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry); this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>(); this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>(); this.clock = clock; + this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled; + // Created when size is disabled for code simplification, and as the overhead is negligible. + this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>(); } /** @@ -100,6 +103,8 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { public void register(SystemStreamPartition systemStreamPartition, String offset) { metrics.initMetrics(systemStreamPartition); bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue()); + // Created when size is disabled for code simplification, and the overhead is negligible. + bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0)); } protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { @@ -150,12 +155,24 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { if (outgoingList.size() > 0) { messagesToReturn.put(systemStreamPartition, outgoingList); + if (fetchLimitByBytesEnabled) { + subtractSizeOnQDrain(systemStreamPartition, outgoingList); + } } } return messagesToReturn; } + private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) { + long outgoingListBytes = 0; + for (IncomingMessageEnvelope envelope : outgoingList) { + outgoingListBytes += envelope.getSize(); + } + // subtract the size of the messages dequeued. + bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1 * outgoingListBytes); + } + /** * Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the * queue for the specified {@link org.apache.samza.system.SystemStreamPartition}. @@ -166,6 +183,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { */ protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException { bufferedMessages.get(systemStreamPartition).put(envelope); + if (fetchLimitByBytesEnabled) { + bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize()); + } } /** @@ -198,6 +218,16 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } } + public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) { + AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition); + + if (sizeInBytes == null) { + throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch"); + } else { + return sizeInBytes.get(); + } + } + protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) { metrics.setNoMoreMessages(systemStreamPartition, isAtHead); return noMoreMessage.put(systemStreamPartition, isAtHead); @@ -232,6 +262,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition)); metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition)); + if (fetchLimitByBytesEnabled) { + metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition)); + } } public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) { @@ -271,4 +304,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return envelopes.size(); } } + + public class BufferSizeGauge extends Gauge<Long> { + private final SystemStreamPartition systemStreamPartition; + + public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) { + super(name, 0L); + + this.systemStreamPartition = systemStreamPartition; + } + + @Override + public Long getValue() { + AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition); + + if (sizeInBytes == null) { + return 0L; + } + + return sizeInBytes.get(); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index d1a0a82..afdae16 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -40,7 +40,13 @@ import org.junit.Test; public class TestBlockingEnvelopeMap { private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0)); private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null); + private static final IncomingMessageEnvelope ENVELOPE_WITH_SIZE = new IncomingMessageEnvelope(SSP, null, null, null, 100); private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>(); + private static final Clock CLOCK = new Clock() { + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + }; static { FETCH.add(SSP); @@ -78,6 +84,35 @@ public class TestBlockingEnvelopeMap { envelopes = map.poll(FETCH, 0); assertEquals(1, envelopes.size()); assertEquals(2, envelopes.get(SSP).size()); + + // Size info. + assertEquals(0, map.getMessagesSizeInQueue(SSP)); + } + + @Test + public void testNoSizeComputation() throws InterruptedException { + BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(); + map.register(SSP, "0"); + map.put(SSP, ENVELOPE); + map.put(SSP, ENVELOPE); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0); + + // Size info. + assertEquals(0, map.getMessagesSizeInQueue(SSP)); + } + + @Test + public void testSizeComputation() throws InterruptedException { + BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(true); + map.register(SSP, "0"); + map.put(SSP, ENVELOPE_WITH_SIZE); + map.put(SSP, ENVELOPE_WITH_SIZE); + + // Size info. + assertEquals(200, map.getMessagesSizeInQueue(SSP)); + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0); + assertEquals(0, map.getMessagesSizeInQueue(SSP)); } @Test @@ -177,12 +212,13 @@ public class TestBlockingEnvelopeMap { this(null); } + public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) { + super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled); + injectedQueue = new MockQueue(); + } + public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue) { - this(injectedQueue, new Clock() { - public long currentTimeMillis() { - return System.currentTimeMillis(); - } - }); + this(injectedQueue, CLOCK); } public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue, Clock clock) { http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index a65e8e8..1822511 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -60,6 +60,15 @@ object KafkaConfig { val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400 + /** + * Defines how many bytes to use for the buffered prefetch messages for job as a whole. + * The bytes for a single system/stream/partition are computed based on this. + * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be + * the bytes limit + size of max message in the partition for a given stream. + * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config. + */ + val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes" + implicit def Config2Kafka(config: Config) = new KafkaConfig(config) } @@ -70,6 +79,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) // custom consumer config def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) + def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name) + def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0 + /** * Returns a map of topic -> fetch.message.max.bytes value for all streams that http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index c948d64..b373753 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka import kafka.common.TopicAndPartition import org.apache.samza.util.Logging +import kafka.message.Message import kafka.message.MessageAndOffset import org.apache.samza.Partition import kafka.utils.Utils @@ -32,10 +33,7 @@ import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.SamzaException import org.apache.samza.util.TopicMetadataStore -import org.apache.samza.util.ExponentialSleepStrategy import kafka.api.TopicMetadata import org.apache.samza.util.ExponentialSleepStrategy import java.util.concurrent.ConcurrentHashMap @@ -43,6 +41,13 @@ import scala.collection.JavaConversions._ import org.apache.samza.system.SystemAdmin object KafkaSystemConsumer { + + // Approximate additional shallow heap overhead per message in addition to the raw bytes + // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead. + // As this overhead is a moving target, and not very large + // compared to the message size its being ignore in the computation for now. + val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4; + def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { val topic = systemStreamPartition.getStream val partitionId = systemStreamPartition.getPartition.getPartitionId @@ -80,22 +85,65 @@ private[kafka] class KafkaSystemConsumer( * to an increase in memory usage since more messages will be held in memory. */ fetchThreshold: Int = 50000, + /** + * Defines a low water mark for how many bytes we buffer before we start + * executing fetch requests against brokers to get more messages. This + * value is divided by 2 because the messages are buffered twice, once in + * KafkaConsumer and then in SystemConsumers. This value + * is divided equally among all registered SystemStreamPartitions. + * However this is a soft limit per partition, as the + * bytes are cached at the message boundaries, and the actual usage can be + * 1000 bytes + size of max message in the partition for a given stream. + * The bytes if the size of the bytebuffer in Message. Hence, the + * Object overhead is not taken into consideration. In this codebase + * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB, + * which is not considerable. + * + * For example, + * if fetchThresholdBytes is set to 100000 bytes, and there are 50 + * SystemStreamPartitions registered, then the per-partition threshold is + * (100000 / 2) / 50 = 1000 bytes. + * As this is a soft limit, the actual usage can be 1000 bytes + size of max message. + * As soon as a SystemStreamPartition's buffered messages bytes drops + * below 1000, a fetch request will be executed to get more data for it. + * + * Increasing this parameter will decrease the latency between when a queue + * is drained of messages and when new messages are enqueued, but also leads + * to an increase in memory usage since more messages will be held in memory. + * + * The default value is -1, which means this is not used. When the value + * is > 0, then the fetchThreshold which is count based is ignored. + */ + fetchThresholdBytes: Long = -1, + /** + * if(fetchThresholdBytes > 0) true else false + */ + fetchLimitByBytesEnabled: Boolean = false, offsetGetter: GetOffset = new GetOffset("fail"), deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock { - def currentTimeMillis = clock() -}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { + clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap( + metrics.registry, + new Clock { + def currentTimeMillis = clock() + }, + classOf[KafkaSystemConsumerMetrics].getName, + fetchLimitByBytesEnabled) with Toss with Logging { type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]() var perPartitionFetchThreshold = fetchThreshold + var perPartitionFetchThresholdBytes = 0L def start() { if (topicPartitionsAndOffsets.size > 0) { perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size + // messages get double buffered, hence divide by 2 + if(fetchLimitByBytesEnabled) { + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size + } } refreshBrokers @@ -202,7 +250,15 @@ private[kafka] class KafkaSystemConsumer( } def needsMoreMessages(tp: TopicAndPartition) = { - getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold + if(fetchLimitByBytesEnabled) { + getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes + } else { + getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold + } + } + + def getMessageSize(message: Message): Integer = { + message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD } def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = { @@ -222,7 +278,11 @@ private[kafka] class KafkaSystemConsumer( null } - put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)) + if(fetchLimitByBytesEnabled ) { + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))) + } else { + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)) + } setIsAtHead(systemStreamPartition, isAtHead) } http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index a60cda2..b574176 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -62,6 +62,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { val autoOffsetResetDefault = consumerConfig.autoOffsetReset val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt + val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout) @@ -77,6 +78,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { consumerMinSize = consumerMinSize, consumerMaxWait = consumerMaxWait, fetchThreshold = fetchThreshold, + fetchThresholdBytes = fetchThresholdBytes, + fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName), offsetGetter = offsetGetter) } http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 85badf9..c4a83f6 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -108,6 +108,16 @@ class TestKafkaConfig { val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME) // topic fetch size assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024)) + + // default samza.fetch.threshold.bytes + val mapConfig3 = new MapConfig(props.toMap[String, String]) + val kafkaConfig3 = new KafkaConfig(mapConfig3) + assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty) + + props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536") + val mapConfig4 = new MapConfig(props.toMap[String, String]) + val kafkaConfig4 = new KafkaConfig(mapConfig4) + assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get) } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 23fa939..ece0359 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -23,6 +23,10 @@ import kafka.api.TopicMetadata import kafka.api.PartitionMetadata import kafka.cluster.Broker import kafka.common.TopicAndPartition +import kafka.message.Message +import kafka.message.MessageAndOffset + +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import org.apache.samza.util.TopicMetadataStore @@ -34,6 +38,9 @@ import org.mockito.Matchers._ class TestKafkaSystemConsumer { val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin]) + private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0)) + private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null) + private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100) @Test def testFetchThresholdShouldDivideEvenlyAmongPartitions { @@ -114,6 +121,69 @@ class TestKafkaSystemConsumer { assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1))) assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2))) } + + @Test + def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, + fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + assertEquals(5000, consumer.perPartitionFetchThreshold) + assertEquals(3000, consumer.perPartitionFetchThresholdBytes) + } + + @Test + def testFetchThresholdBytes { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, + fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + val msg = Array[Byte](5, 112, 9, 126) + val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654) + // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead + consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354) + + assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) + } + + + @Test + def testFetchThresholdBytesDisabled { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, + fetchThreshold = 50000, fetchThresholdBytes = 60000L) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + assertEquals(5000, consumer.perPartitionFetchThreshold) + assertEquals(0, consumer.perPartitionFetchThresholdBytes) + assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) + } } class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
