Author: robbie Date: Thu Aug 28 16:34:09 2014 New Revision: 1621161 URL: http://svn.apache.org/r1621161 Log: QPIDJMS-26: update message receiver to use delivery events to provoke initial processing
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1621161&r1=1621160&r2=1621161&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Thu Aug 28 16:34:09 2014 @@ -20,6 +20,9 @@ */ package org.apache.qpid.jms.engine; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; + import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; @@ -29,6 +32,9 @@ public class AmqpReceiver extends AmqpLi private Receiver _protonReceiver; private byte[] _buffer = new byte[1024]; + //TODO: custom queue with timeout based retrieval + Deque<AmqpMessage> _messages = new ConcurrentLinkedDeque<AmqpMessage>(); + public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver, AmqpConnection amqpConnection) { super(amqpSession, protonReceiver, amqpConnection); @@ -45,55 +51,58 @@ public class AmqpReceiver extends AmqpLi public AmqpMessage receiveNoWait() { - synchronized (getAmqpConnection()) + return _messages.pollFirst(); + } + + @Override + void processDeliveryUpdate(Delivery delivery) + { + //TODO: this is currently processing all messages for the link, should really just do the one given. + // We can't call recv if the passed delivery is not the 'current', but cant throw the event away either (could be a before-complete disposition change?) + // Doesnt handle settlement yet. + + Delivery currentDelivery = _protonReceiver.current(); + if(currentDelivery != null) { - Delivery currentDelivery = _protonReceiver.current(); - if(currentDelivery != null) + if(currentDelivery.getContext() == null) { - if(currentDelivery.getContext() == null) + if (currentDelivery.isReadable() && !currentDelivery.isPartial()) { - if (currentDelivery.isReadable() && !currentDelivery.isPartial()) + int total = 0; + int start = 0; + while (true) { - - int total = 0; - int start = 0; - while (true) + int read = _protonReceiver.recv(_buffer, start, _buffer.length - start); + total += read; + if (read == (_buffer.length - start)) { - int read = _protonReceiver.recv(_buffer, start, _buffer.length - start); - total += read; - if (read == (_buffer.length - start)) - { - //may need to expand the buffer (is there a better test?) - byte[] old = _buffer; - _buffer = new byte[_buffer.length*2]; - System.arraycopy(old, 0, _buffer, 0, old.length); - start += read; - } - else - { - break; - } + //may need to expand the buffer (is there a better test?) + byte[] old = _buffer; + _buffer = new byte[_buffer.length*2]; + System.arraycopy(old, 0, _buffer, 0, old.length); + start += read; } + else + { + break; + } + } - Message message = Message.Factory.create(); - message.decode(_buffer, 0, total); + Message message = Message.Factory.create(); + message.decode(_buffer, 0, total); - //TODO: dont create a new factory for every message - AmqpMessage amqpMessage = new AmqpMessageFactory().createAmqpMessage(currentDelivery, message, getAmqpConnection()); - currentDelivery.setContext(amqpMessage); - _protonReceiver.advance(); - return amqpMessage; - } + //TODO: dont create a new factory for every message + AmqpMessage amqpMessage = new AmqpMessageFactory().createAmqpMessage(currentDelivery, message, getAmqpConnection()); + currentDelivery.setContext(amqpMessage); + _protonReceiver.advance(); + _messages.add(amqpMessage); } } + else + { + //TODO: previously processed this message. Updated disposition info? + } } - return null; - } - - @Override - void processDeliveryUpdate(Delivery delivery) - { - //TODO: implement receiver delivery update event processing } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org