Author: rgodfrey Date: Wed May 30 16:39:47 2012 New Revision: 1344343 URL: http://svn.apache.org/viewvc?rev=1344343&view=rev Log: proton-j : get disposition working
Added: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java Modified: qpid/proton/trunk/proton-j/jproton.py qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java Modified: qpid/proton/trunk/proton-j/jproton.py URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/jproton.py?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/jproton.py (original) +++ qpid/proton/trunk/proton-j/jproton.py Wed May 30 16:39:47 2012 @@ -197,3 +197,6 @@ def pn_remote_disp(d): def pn_local_disp(d): if(d.getLocalState() == Accepted.getInstance()): return PN_ACCEPTED + +def pn_settle(d): + d.settle() Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Wed May 30 16:39:47 2012 @@ -53,6 +53,8 @@ public class DeliveryImpl implements Del private byte[] _data; private int _dataSize; private boolean _complete; + private boolean _updated; + private boolean _done; public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous) { @@ -104,6 +106,7 @@ public class DeliveryImpl implements Del public void settle() { _settled = true; + setTransportFlag(DELIVERY_STATE_CHANGED); } DeliveryImpl getLinkNext() @@ -388,4 +391,26 @@ public class DeliveryImpl implements Del { _complete = true; } + + void setRemoteDeliveryState(DeliveryState remoteDeliveryState) + { + _remoteDeliveryState = remoteDeliveryState; + _updated = true; + } + + public boolean isUpdated() + { + return _updated; + } + + + void setDone() + { + _done = true; + } + + boolean isDone() + { + return _done; + } } Added: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java?rev=1344343&view=auto ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java (added) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java Wed May 30 16:39:47 2012 @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.engine.impl; + +import org.apache.qpid.proton.engine.Accepted; + +class DeliveryStateConverter +{ + + public static org.apache.qpid.proton.engine.DeliveryState convert(org.apache.qpid.proton.type.transport.DeliveryState state) + { + if(state instanceof org.apache.qpid.proton.type.messaging.Accepted) + { + return Accepted.getInstance(); + } + return null; + } +} Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java Wed May 30 16:39:47 2012 @@ -116,6 +116,7 @@ public abstract class LinkImpl extends E { if(_current != null ) { + _current.setDone(); DeliveryImpl oldCurrent = _current; _current = _current.getLinkNext(); getConnectionImpl().workUpdate(oldCurrent); Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java Wed May 30 16:39:47 2012 @@ -107,7 +107,6 @@ public class TransportImpl extends Endpo public int output(byte[] bytes, final int offset, final int size) { - int written = 0; written += processHeader(bytes, offset); @@ -189,7 +188,38 @@ public class TransportImpl extends Endpo private int processSenderDisposition(byte[] bytes, int offset, int length) { - return 0; //TODO - Implement + DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead(); + int written = 0; + while(delivery != null && length >= _maxFrameSize) + { + if((delivery.getLink() instanceof SenderImpl) && delivery.isLocalStateChange()) + { + TransportDelivery transportDelivery = delivery.getTransportDelivery(); + Disposition disposition = new Disposition(); + disposition.setFirst(transportDelivery.getDeliveryId()); + disposition.setLast(transportDelivery.getDeliveryId()); + disposition.setRole(Role.SENDER); + disposition.setSettled(delivery.isSettled()); + DeliveryState deliveryState = delivery.getLocalState(); + if(deliveryState == Accepted.getInstance()) + { + disposition.setState(ACCEPTED); + } + else + { + // TODO + } + int frameBytes = writeFrame(bytes, offset, length, delivery.getLink().getSession() + .getTransportSession().getLocalChannel(), + disposition, null); + written += frameBytes; + offset += frameBytes; + length -= frameBytes; + } + delivery = delivery.getTransportWorkNext(); + } + return written; + } private int processMessageData(byte[] bytes, int offset, int length) @@ -198,7 +228,7 @@ public class TransportImpl extends Endpo int written = 0; while(delivery != null && length >= _maxFrameSize) { - if((delivery.getLink() instanceof SenderImpl)) + if((delivery.getLink() instanceof SenderImpl) && !(delivery.isDone() && delivery.getDataLength() == 0)) { SenderImpl sender = (SenderImpl) delivery.getLink(); @@ -208,7 +238,8 @@ public class TransportImpl extends Endpo UnsignedInteger deliveryId = transportLink.getDeliveryCount(); TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportLink); - + delivery.setTransportDelivery(transportDelivery); + sender.getSession().getTransportSession().addUnsettledOutgoing(deliveryId, delivery); Transfer transfer = new Transfer(); transfer.setDeliveryId(deliveryId); Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java Wed May 30 16:39:47 2012 @@ -49,7 +49,10 @@ class TransportSession private UnsignedInteger _remoteOutgoingWindow; private UnsignedInteger _remoteNextIncomingId; private UnsignedInteger _remoteNextOutgoingId; - private Map<UnsignedInteger, DeliveryImpl> _unsettledDeliveriesById = new HashMap<UnsignedInteger, DeliveryImpl>(); + private Map<UnsignedInteger, DeliveryImpl> + _unsettledIncomingDeliveriesById = new HashMap<UnsignedInteger, DeliveryImpl>(); + private Map<UnsignedInteger, DeliveryImpl> + _unsettledOutgoingDeliveriesById = new HashMap<UnsignedInteger, DeliveryImpl>(); public TransportSession(SessionImpl session) { @@ -188,7 +191,7 @@ class TransportSession TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle()); ReceiverImpl receiver = transportReceiver.getReceiver(); Binary deliveryTag = transfer.getDeliveryTag(); - delivery = _unsettledDeliveriesById.get(_currentDeliveryId); + delivery = _unsettledIncomingDeliveriesById.get(_currentDeliveryId); } @@ -204,7 +207,7 @@ class TransportSession deliveryTag.getLength()); TransportDelivery transportDelivery = new TransportDelivery(_currentDeliveryId, delivery, transportReceiver); delivery.setTransportDelivery(transportDelivery); - _unsettledDeliveriesById.put(_currentDeliveryId, delivery); + _unsettledIncomingDeliveriesById.put(_currentDeliveryId, delivery); } @@ -270,6 +273,29 @@ class TransportSession void handleDisposition(Disposition disposition) { + UnsignedInteger id = disposition.getFirst(); + UnsignedInteger last = disposition.getLast() == null ? id : disposition.getLast(); + final Map<UnsignedInteger, DeliveryImpl> unsettledDeliveries = + disposition.getRole() ? _unsettledOutgoingDeliveriesById + : _unsettledIncomingDeliveriesById; + + while(id.compareTo(last)<=0) + { + DeliveryImpl delivery = unsettledDeliveries.get(id); + if(delivery != null) + { + if(disposition.getState() != null) + { + delivery.setRemoteDeliveryState(DeliveryStateConverter.convert(disposition.getState())); + } + } + id = id.add(UnsignedInteger.ONE); + } //TODO - Implement. } + + void addUnsettledOutgoing(UnsignedInteger deliveryId, DeliveryImpl delivery) + { + _unsettledOutgoingDeliveriesById.put(deliveryId, delivery); + } } Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java Wed May 30 16:39:47 2012 @@ -36,13 +36,13 @@ public class Accepted { private static final Object[] DESCRIPTORS = { - UnsignedLong.valueOf(0x0000000000000024L), Symbol.valueOf("amqp:accepted:list"), + UnsignedLong.valueOf(0x0000000000000024L), Symbol.valueOf("amqp:accepted:list"), }; private static final UnsignedLong DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000024L); private final AcceptedWrapper _wrapper = new AcceptedWrapper(); - - + + public Object getDescriptor() { return DESCRIPTOR; @@ -52,7 +52,7 @@ public class Accepted { return _wrapper; } - + public Object get(final int index) { @@ -62,7 +62,7 @@ public class Accepted public int size() { - return 0; + return 0; } @@ -110,5 +110,10 @@ public class Accepted decoder.register(descriptor, constructor); } } + + @Override + public String toString() + { + return "Accepted{}"; + } } - \ No newline at end of file Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java?rev=1344343&r1=1344342&r2=1344343&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java Wed May 30 16:39:47 2012 @@ -37,12 +37,12 @@ public class Disposition { private static final Object[] DESCRIPTORS = { - UnsignedLong.valueOf(0x0000000000000015L), Symbol.valueOf("amqp:disposition:list"), + UnsignedLong.valueOf(0x0000000000000015L), Symbol.valueOf("amqp:disposition:list"), }; private static final UnsignedLong DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000015L); private final DispositionWrapper _wrapper = new DispositionWrapper(); - + private boolean _role; private UnsignedInteger _first; private UnsignedInteger _last; @@ -114,7 +114,7 @@ public class Disposition { _batchable = batchable; } - + public Object getDescriptor() { return DESCRIPTOR; @@ -124,7 +124,7 @@ public class Disposition { return _wrapper; } - + public Object get(final int index) { @@ -141,7 +141,7 @@ public class Disposition case 4: return _state; case 5: - return _batchable; + return _batchable; } throw new IllegalStateException("Unknown index " + index); @@ -151,14 +151,14 @@ public class Disposition public int size() { return (_batchable != false) - ? 6 - : _state != null - ? 5 + ? 6 + : _state != null + ? 5 : (_settled != false) - ? 4 - : _last != null - ? 3 - : 2; + ? 4 + : _last != null + ? 3 + : 2; } @@ -235,5 +235,17 @@ public class Disposition decoder.register(descriptor, constructor); } } + + @Override + public String toString() + { + return "Disposition{" + + "role=" + (_role ? "RECIEVER" : "SENDER") + + ", first=" + _first + + ", last=" + _last + + ", settled=" + _settled + + ", state=" + _state + + ", batchable=" + _batchable + + '}'; + } } - \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org