Updated Branches: refs/heads/master 4dec9e736 -> ba8ed30d5
SAMZA-120; switch BlockingEnvelopeMap to use put instead of add on blocking queues. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ba8ed30d Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ba8ed30d Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ba8ed30d Branch: refs/heads/master Commit: ba8ed30d564bac80fdeb7a24433174aee383e6ff Parents: 4dec9e7 Author: Chris Riccomini <cricc...@criccomi-ld.linkedin.biz> Authored: Tue Jan 7 12:21:00 2014 -0800 Committer: Chris Riccomini <cricc...@criccomi-ld.linkedin.biz> Committed: Tue Jan 7 12:21:00 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/samza/util/BlockingEnvelopeMap.java | 9 ++++----- .../java/org/apache/samza/util/TestBlockingEnvelopeMap.java | 8 ++++---- .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- .../org/apache/samza/system/mock/MockSystemConsumer.java | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index a7c32bc..ab4a48f 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -27,7 +27,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; @@ -152,15 +151,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return messagesToReturn; } - protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) { - bufferedMessages.get(systemStreamPartition).add(envelope); + protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException { + bufferedMessages.get(systemStreamPartition).put(envelope); } - protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) { + protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException { BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition); for (IncomingMessageEnvelope envelope : envelopes) { - queue.add(envelope); + queue.put(envelope); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index e8c2e32..cb4d148 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -64,11 +64,11 @@ public class TestBlockingEnvelopeMap { public void testShouldGetSomeMessages() throws InterruptedException { BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(); map.register(SSP, "0"); - map.add(SSP, envelope); + map.put(SSP, envelope); List<IncomingMessageEnvelope> envelopes = map.poll(FETCH, 0); assertEquals(1, envelopes.size()); - map.add(SSP, envelope); - map.add(SSP, envelope); + map.put(SSP, envelope); + map.put(SSP, envelope); envelopes = map.poll(FETCH, 0); assertEquals(2, envelopes.size()); } @@ -81,7 +81,7 @@ public class TestBlockingEnvelopeMap { map.register(SSP, "0"); for (int i = 0; i < 3 * maxMessages; ++i) { - map.add(SSP, envelope); + map.put(SSP, envelope); } assertEquals(3 * maxMessages, map.getNumMessagesInQueue(SSP)); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 3ee4068..33826d2 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -171,7 +171,7 @@ private[kafka] class KafkaSystemConsumer( null } - add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)) + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)) setIsAtHead(systemStreamPartition, isAtHead) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java index c0791d7..1e3457b 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java @@ -160,7 +160,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap { // Add messages to the BlockingEnvelopeMap. for (SystemStreamPartition ssp : sspsToFetch) { for (int i = 0; i < messagesPerBatch; ++i) { - add(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value")); + put(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value")); } } }