Updated Branches:
  refs/heads/master 4dec9e736 -> ba8ed30d5

SAMZA-120; switch BlockingEnvelopeMap to use put instead of add on blocking 
queues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ba8ed30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ba8ed30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ba8ed30d

Branch: refs/heads/master
Commit: ba8ed30d564bac80fdeb7a24433174aee383e6ff
Parents: 4dec9e7
Author: Chris Riccomini <cricc...@criccomi-ld.linkedin.biz>
Authored: Tue Jan 7 12:21:00 2014 -0800
Committer: Chris Riccomini <cricc...@criccomi-ld.linkedin.biz>
Committed: Tue Jan 7 12:21:00 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/samza/util/BlockingEnvelopeMap.java     | 9 ++++-----
 .../java/org/apache/samza/util/TestBlockingEnvelopeMap.java | 8 ++++----
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +-
 .../org/apache/samza/system/mock/MockSystemConsumer.java    | 2 +-
 4 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index a7c32bc..ab4a48f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -27,7 +27,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -152,15 +151,15 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     return messagesToReturn;
   }
 
-  protected void add(SystemStreamPartition systemStreamPartition, 
IncomingMessageEnvelope envelope) {
-    bufferedMessages.get(systemStreamPartition).add(envelope);
+  protected void put(SystemStreamPartition systemStreamPartition, 
IncomingMessageEnvelope envelope) throws InterruptedException {
+    bufferedMessages.get(systemStreamPartition).put(envelope);
   }
 
-  protected void addAll(SystemStreamPartition systemStreamPartition, 
List<IncomingMessageEnvelope> envelopes) {
+  protected void putAll(SystemStreamPartition systemStreamPartition, 
List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
     BlockingQueue<IncomingMessageEnvelope> queue = 
bufferedMessages.get(systemStreamPartition);
 
     for (IncomingMessageEnvelope envelope : envelopes) {
-      queue.add(envelope);
+      queue.put(envelope);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java 
b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index e8c2e32..cb4d148 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -64,11 +64,11 @@ public class TestBlockingEnvelopeMap {
   public void testShouldGetSomeMessages() throws InterruptedException {
     BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
     map.register(SSP, "0");
-    map.add(SSP, envelope);
+    map.put(SSP, envelope);
     List<IncomingMessageEnvelope> envelopes = map.poll(FETCH, 0);
     assertEquals(1, envelopes.size());
-    map.add(SSP, envelope);
-    map.add(SSP, envelope);
+    map.put(SSP, envelope);
+    map.put(SSP, envelope);
     envelopes = map.poll(FETCH, 0);
     assertEquals(2, envelopes.size());
   }
@@ -81,7 +81,7 @@ public class TestBlockingEnvelopeMap {
     map.register(SSP, "0");
 
     for (int i = 0; i < 3 * maxMessages; ++i) {
-      map.add(SSP, envelope);
+      map.put(SSP, envelope);
     }
 
     assertEquals(3 * maxMessages, map.getNumMessagesInQueue(SSP));

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 3ee4068..33826d2 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -171,7 +171,7 @@ private[kafka] class KafkaSystemConsumer(
         null
       }
 
-      add(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+      put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
 
       setIsAtHead(systemStreamPartition, isAtHead)
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ba8ed30d/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
index c0791d7..1e3457b 100644
--- 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
+++ 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -160,7 +160,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap 
{
           // Add messages to the BlockingEnvelopeMap.
           for (SystemStreamPartition ssp : sspsToFetch) {
             for (int i = 0; i < messagesPerBatch; ++i) {
-              add(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value"));
+              put(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value"));
             }
           }
         }

Reply via email to