Author: rgodfrey
Date: Wed May 30 16:39:47 2012
New Revision: 1344343

URL: http://svn.apache.org/viewvc?rev=1344343&view=rev
Log:
proton-j : get disposition working

Added:
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java
Modified:
    qpid/proton/trunk/proton-j/jproton.py
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java

Modified: qpid/proton/trunk/proton-j/jproton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/jproton.py?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/jproton.py (original)
+++ qpid/proton/trunk/proton-j/jproton.py Wed May 30 16:39:47 2012
@@ -197,3 +197,6 @@ def pn_remote_disp(d):
 def pn_local_disp(d):
   if(d.getLocalState() == Accepted.getInstance()):
     return PN_ACCEPTED
+
+def pn_settle(d):
+  d.settle()

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
 Wed May 30 16:39:47 2012
@@ -53,6 +53,8 @@ public class DeliveryImpl implements Del
     private byte[] _data;
     private int _dataSize;
     private boolean _complete;
+    private boolean _updated;
+    private boolean _done;
 
     public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl 
previous)
     {
@@ -104,6 +106,7 @@ public class DeliveryImpl implements Del
     public void settle()
     {
         _settled = true;
+        setTransportFlag(DELIVERY_STATE_CHANGED);
     }
 
     DeliveryImpl getLinkNext()
@@ -388,4 +391,26 @@ public class DeliveryImpl implements Del
     {
         _complete = true;
     }
+
+    void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
+    {
+        _remoteDeliveryState = remoteDeliveryState;
+        _updated = true;
+    }
+
+    public boolean isUpdated()
+    {
+        return _updated;
+    }
+
+
+    void setDone()
+    {
+        _done = true;
+    }
+
+    boolean isDone()
+    {
+        return _done;
+    }
 }

Added: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java?rev=1344343&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java
 (added)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryStateConverter.java
 Wed May 30 16:39:47 2012
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine.impl;
+
+import org.apache.qpid.proton.engine.Accepted;
+
+class DeliveryStateConverter
+{
+
+    public static org.apache.qpid.proton.engine.DeliveryState 
convert(org.apache.qpid.proton.type.transport.DeliveryState state)
+    {
+        if(state instanceof org.apache.qpid.proton.type.messaging.Accepted)
+        {
+            return Accepted.getInstance();
+        }
+        return null;
+    }
+}

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java 
(original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java 
Wed May 30 16:39:47 2012
@@ -116,6 +116,7 @@ public abstract class LinkImpl extends E
     {
         if(_current != null )
         {
+            _current.setDone();
             DeliveryImpl oldCurrent = _current;
             _current = _current.getLinkNext();
             getConnectionImpl().workUpdate(oldCurrent);

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Wed May 30 16:39:47 2012
@@ -107,7 +107,6 @@ public class TransportImpl extends Endpo
 
     public int output(byte[] bytes, final int offset, final int size)
     {
-
         int written = 0;
 
         written += processHeader(bytes, offset);
@@ -189,7 +188,38 @@ public class TransportImpl extends Endpo
 
     private int processSenderDisposition(byte[] bytes, int offset, int length)
     {
-        return 0;  //TODO - Implement
+        DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
+        int written = 0;
+        while(delivery != null && length >= _maxFrameSize)
+        {
+            if((delivery.getLink() instanceof SenderImpl) && 
delivery.isLocalStateChange())
+            {
+                TransportDelivery transportDelivery = 
delivery.getTransportDelivery();
+                Disposition disposition = new Disposition();
+                disposition.setFirst(transportDelivery.getDeliveryId());
+                disposition.setLast(transportDelivery.getDeliveryId());
+                disposition.setRole(Role.SENDER);
+                disposition.setSettled(delivery.isSettled());
+                DeliveryState deliveryState = delivery.getLocalState();
+                if(deliveryState == Accepted.getInstance())
+                {
+                    disposition.setState(ACCEPTED);
+                }
+                else
+                {
+                    // TODO
+                }
+                int frameBytes = writeFrame(bytes, offset, length, 
delivery.getLink().getSession()
+                                                                  
.getTransportSession().getLocalChannel(),
+                                   disposition, null);
+                written += frameBytes;
+                offset += frameBytes;
+                length -= frameBytes;
+            }
+            delivery = delivery.getTransportWorkNext();
+        }
+        return written;
+
     }
 
     private int processMessageData(byte[] bytes, int offset, int length)
@@ -198,7 +228,7 @@ public class TransportImpl extends Endpo
         int written = 0;
         while(delivery != null && length >= _maxFrameSize)
         {
-            if((delivery.getLink() instanceof SenderImpl))
+            if((delivery.getLink() instanceof SenderImpl) && 
!(delivery.isDone() && delivery.getDataLength() == 0))
             {
                 SenderImpl sender = (SenderImpl) delivery.getLink();
 
@@ -208,7 +238,8 @@ public class TransportImpl extends Endpo
 
                 UnsignedInteger deliveryId = transportLink.getDeliveryCount();
                 TransportDelivery transportDelivery = new 
TransportDelivery(deliveryId, delivery, transportLink);
-
+                delivery.setTransportDelivery(transportDelivery);
+                
sender.getSession().getTransportSession().addUnsettledOutgoing(deliveryId, 
delivery);
 
                 Transfer transfer = new Transfer();
                 transfer.setDeliveryId(deliveryId);

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
 Wed May 30 16:39:47 2012
@@ -49,7 +49,10 @@ class TransportSession
     private UnsignedInteger _remoteOutgoingWindow;
     private UnsignedInteger _remoteNextIncomingId;
     private UnsignedInteger _remoteNextOutgoingId;
-    private Map<UnsignedInteger, DeliveryImpl> _unsettledDeliveriesById = new 
HashMap<UnsignedInteger, DeliveryImpl>();
+    private Map<UnsignedInteger, DeliveryImpl>
+            _unsettledIncomingDeliveriesById = new HashMap<UnsignedInteger, 
DeliveryImpl>();
+    private Map<UnsignedInteger, DeliveryImpl>
+            _unsettledOutgoingDeliveriesById = new HashMap<UnsignedInteger, 
DeliveryImpl>();
 
     public TransportSession(SessionImpl session)
     {
@@ -188,7 +191,7 @@ class TransportSession
             TransportReceiver transportReceiver = (TransportReceiver) 
getLinkFromRemoteHandle(transfer.getHandle());
             ReceiverImpl receiver = transportReceiver.getReceiver();
             Binary deliveryTag = transfer.getDeliveryTag();
-            delivery = _unsettledDeliveriesById.get(_currentDeliveryId);
+            delivery = 
_unsettledIncomingDeliveriesById.get(_currentDeliveryId);
 
 
         }
@@ -204,7 +207,7 @@ class TransportSession
                                                       deliveryTag.getLength());
             TransportDelivery transportDelivery = new 
TransportDelivery(_currentDeliveryId, delivery, transportReceiver);
             delivery.setTransportDelivery(transportDelivery);
-            _unsettledDeliveriesById.put(_currentDeliveryId, delivery);
+            _unsettledIncomingDeliveriesById.put(_currentDeliveryId, delivery);
 
 
         }
@@ -270,6 +273,29 @@ class TransportSession
 
     void handleDisposition(Disposition disposition)
     {
+        UnsignedInteger id = disposition.getFirst();
+        UnsignedInteger last = disposition.getLast() == null ? id : 
disposition.getLast();
+        final Map<UnsignedInteger, DeliveryImpl> unsettledDeliveries =
+                disposition.getRole() ? _unsettledOutgoingDeliveriesById
+                        : _unsettledIncomingDeliveriesById;
+
+        while(id.compareTo(last)<=0)
+        {
+            DeliveryImpl delivery = unsettledDeliveries.get(id);
+            if(delivery != null)
+            {
+                if(disposition.getState() != null)
+                {
+                    
delivery.setRemoteDeliveryState(DeliveryStateConverter.convert(disposition.getState()));
+                }
+            }
+            id = id.add(UnsignedInteger.ONE);
+        }
         //TODO - Implement.
     }
+
+    void addUnsettledOutgoing(UnsignedInteger deliveryId, DeliveryImpl 
delivery)
+    {
+        _unsettledOutgoingDeliveriesById.put(deliveryId, delivery);
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/messaging/Accepted.java
 Wed May 30 16:39:47 2012
@@ -36,13 +36,13 @@ public class Accepted
 {
     private static final Object[] DESCRIPTORS =
     {
-        UnsignedLong.valueOf(0x0000000000000024L), 
Symbol.valueOf("amqp:accepted:list"), 
+        UnsignedLong.valueOf(0x0000000000000024L), 
Symbol.valueOf("amqp:accepted:list"),
     };
 
     private static final UnsignedLong DESCRIPTOR = 
UnsignedLong.valueOf(0x0000000000000024L);
     private final AcceptedWrapper _wrapper = new AcceptedWrapper();
-    
-    
+
+
     public Object getDescriptor()
     {
         return DESCRIPTOR;
@@ -52,7 +52,7 @@ public class Accepted
     {
         return _wrapper;
     }
-    
+
     public Object get(final int index)
     {
 
@@ -62,7 +62,7 @@ public class Accepted
 
     public int size()
     {
-        return 0;        
+        return 0;
 
     }
 
@@ -110,5 +110,10 @@ public class Accepted
             decoder.register(descriptor, constructor);
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "Accepted{}";
+    }
 }
-  
\ No newline at end of file

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java?rev=1344343&r1=1344342&r2=1344343&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Disposition.java
 Wed May 30 16:39:47 2012
@@ -37,12 +37,12 @@ public class Disposition
 {
     private static final Object[] DESCRIPTORS =
     {
-        UnsignedLong.valueOf(0x0000000000000015L), 
Symbol.valueOf("amqp:disposition:list"), 
+        UnsignedLong.valueOf(0x0000000000000015L), 
Symbol.valueOf("amqp:disposition:list"),
     };
 
     private static final UnsignedLong DESCRIPTOR = 
UnsignedLong.valueOf(0x0000000000000015L);
     private final DispositionWrapper _wrapper = new DispositionWrapper();
-    
+
     private boolean _role;
     private UnsignedInteger _first;
     private UnsignedInteger _last;
@@ -114,7 +114,7 @@ public class Disposition
     {
         _batchable = batchable;
     }
-    
+
     public Object getDescriptor()
     {
         return DESCRIPTOR;
@@ -124,7 +124,7 @@ public class Disposition
     {
         return _wrapper;
     }
-    
+
     public Object get(final int index)
     {
 
@@ -141,7 +141,7 @@ public class Disposition
             case 4:
                 return _state;
             case 5:
-                return _batchable;            
+                return _batchable;
         }
 
         throw new IllegalStateException("Unknown index " + index);
@@ -151,14 +151,14 @@ public class Disposition
     public int size()
     {
         return (_batchable != false)
-                  ? 6 
-                  : _state != null 
-                  ? 5 
+                  ? 6
+                  : _state != null
+                  ? 5
                   : (_settled != false)
-                  ? 4 
-                  : _last != null 
-                  ? 3 
-                  : 2;        
+                  ? 4
+                  : _last != null
+                  ? 3
+                  : 2;
 
     }
 
@@ -235,5 +235,17 @@ public class Disposition
             decoder.register(descriptor, constructor);
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "Disposition{" +
+               "role=" + (_role ? "RECIEVER" : "SENDER") +
+               ", first=" + _first +
+               ", last=" + _last +
+               ", settled=" + _settled +
+               ", state=" + _state +
+               ", batchable=" + _batchable +
+               '}';
+    }
 }
-  
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to