Author: robbie
Date: Thu Aug 28 16:34:09 2014
New Revision: 1621161

URL: http://svn.apache.org/r1621161
Log:
QPIDJMS-26: update message receiver to use delivery events to provoke initial 
processing

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1621161&r1=1621160&r2=1621161&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java 
Thu Aug 28 16:34:09 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.jms.engine;
 
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
@@ -29,6 +32,9 @@ public class AmqpReceiver extends AmqpLi
     private Receiver _protonReceiver;
     private byte[] _buffer = new byte[1024];
 
+    //TODO: custom queue with timeout based retrieval
+    Deque<AmqpMessage> _messages = new ConcurrentLinkedDeque<AmqpMessage>();
+
     public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver, 
AmqpConnection amqpConnection)
     {
         super(amqpSession, protonReceiver, amqpConnection);
@@ -45,55 +51,58 @@ public class AmqpReceiver extends AmqpLi
 
     public AmqpMessage receiveNoWait()
     {
-        synchronized (getAmqpConnection())
+        return _messages.pollFirst();
+    }
+
+    @Override
+    void processDeliveryUpdate(Delivery delivery)
+    {
+        //TODO: this is currently processing all messages for the link, should 
really just do the one given.
+        // We can't call recv if the passed delivery is not the 'current', but 
cant throw the event away either (could be a before-complete disposition 
change?)
+        // Doesnt handle settlement yet.
+
+        Delivery currentDelivery = _protonReceiver.current();
+        if(currentDelivery != null)
         {
-            Delivery currentDelivery = _protonReceiver.current();
-            if(currentDelivery != null)
+            if(currentDelivery.getContext() == null)
             {
-                if(currentDelivery.getContext() == null)
+                if (currentDelivery.isReadable() && 
!currentDelivery.isPartial())
                 {
-                    if (currentDelivery.isReadable() && 
!currentDelivery.isPartial())
+                    int total = 0;
+                    int start = 0;
+                    while (true)
                     {
-
-                        int total = 0;
-                        int start = 0;
-                        while (true)
+                        int read = _protonReceiver.recv(_buffer, start, 
_buffer.length - start);
+                        total += read;
+                        if (read == (_buffer.length - start))
                         {
-                            int read = _protonReceiver.recv(_buffer, start, 
_buffer.length - start);
-                            total += read;
-                            if (read == (_buffer.length - start))
-                            {
-                                //may need to expand the buffer (is there a 
better test?)
-                                byte[] old = _buffer;
-                                _buffer = new byte[_buffer.length*2];
-                                System.arraycopy(old, 0, _buffer, 0, 
old.length);
-                                start += read;
-                            }
-                            else
-                            {
-                                break;
-                            }
+                            //may need to expand the buffer (is there a better 
test?)
+                            byte[] old = _buffer;
+                            _buffer = new byte[_buffer.length*2];
+                            System.arraycopy(old, 0, _buffer, 0, old.length);
+                            start += read;
                         }
+                        else
+                        {
+                            break;
+                        }
+                    }
 
-                        Message message = Message.Factory.create();
-                        message.decode(_buffer, 0, total);
+                    Message message = Message.Factory.create();
+                    message.decode(_buffer, 0, total);
 
-                        //TODO: dont create a new factory for every message
-                        AmqpMessage amqpMessage = new 
AmqpMessageFactory().createAmqpMessage(currentDelivery, message, 
getAmqpConnection());
-                        currentDelivery.setContext(amqpMessage);
-                        _protonReceiver.advance();
-                        return amqpMessage;
-                    }
+                    //TODO: dont create a new factory for every message
+                    AmqpMessage amqpMessage = new 
AmqpMessageFactory().createAmqpMessage(currentDelivery, message, 
getAmqpConnection());
+                    currentDelivery.setContext(amqpMessage);
+                    _protonReceiver.advance();
+                    _messages.add(amqpMessage);
                 }
             }
+            else
+            {
+                //TODO: previously processed this message. Updated disposition 
info?
+            }
         }
-        return null;
-    }
-
-    @Override
-    void processDeliveryUpdate(Delivery delivery)
-    {
-        //TODO: implement receiver delivery update event processing
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to