Author: gsim
Date: Tue Dec 11 13:11:00 2012
New Revision: 1420140

URL: http://svn.apache.org/viewvc?rev=1420140&view=rev
Log:
PROTON-118: add initial implementation of messenger

Added:
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Modified:
    qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public enum AcceptMode
+{
+    AUTO,
+    MANUAL
+}
+

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.proton.message.Message;
+
+public interface Messenger
+{
+    static final int CUMULATIVE = 0x01;
+
+    void put(Message message) throws MessengerException;
+    void send() throws TimeoutException;
+
+    void subscribe(String source) throws MessengerException;
+    void recv(int count) throws TimeoutException;
+    Message get();
+
+    void start() throws IOException;
+    void stop();
+
+    void setTimeout(long timeInMillis);
+    long getTimeout();
+
+    int outgoing();
+    int incoming();
+
+    AcceptMode getAcceptMode();
+    void setAcceptMode(AcceptMode mode);
+
+    int getIncomingWindow();
+    void setIncomingWindow(int window);
+
+    int getOutgoingWindow();
+    void setOutgoingWindow(int window);
+
+    Tracker incomingTracker();
+    Tracker outgoingTracker();
+
+    void reject(Tracker tracker, int flags);
+    void accept(Tracker tracker, int flags);
+    void settle(Tracker tracker, int flags);
+
+    Status getStatus(Tracker tracker);
+}

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.messenger;
+
+public class MessengerException extends Exception
+{
+    public MessengerException()
+    {
+    }
+
+    public MessengerException(String message)
+    {
+        super(message);
+    }
+
+    public MessengerException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    public MessengerException(Throwable cause)
+    {
+        super(cause);
+    }
+
+}

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public enum Status
+{
+    UNKNOWN,
+    PENDING,
+    ACCEPTED,
+    REJECTED
+}

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public interface Tracker { }
+

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,877 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.driver.impl.DriverImpl;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.AcceptMode;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Source;
+import org.apache.qpid.proton.type.messaging.Target;
+
+public class MessengerImpl implements Messenger
+{
+    private static final EnumSet<EndpointState> UNINIT = 
EnumSet.of(EndpointState.UNINITIALIZED);
+    private static final EnumSet<EndpointState> ACTIVE = 
EnumSet.of(EndpointState.ACTIVE);
+    private static final EnumSet<EndpointState> CLOSED = 
EnumSet.of(EndpointState.CLOSED);
+    private static final EnumSet<EndpointState> ANY = 
EnumSet.allOf(EndpointState.class);
+    private static final Accepted ACCEPTED = new Accepted();
+
+    private final Logger _logger = Logger.getLogger("proton.messenger");
+    private final String _name;
+    private long _timeout = -1;
+    private long _nextTag = 1;
+    private byte[] _buffer = new byte[5*1024];
+    private Driver _driver;
+    private int _credit;
+    private int _distributed;
+    private AcceptMode _acceptMode = AcceptMode.AUTO;
+    private TrackerQueue _incoming = new TrackerQueue();
+    private TrackerQueue _outgoing = new TrackerQueue();
+
+    public MessengerImpl()
+    {
+        this(java.util.UUID.randomUUID().toString());
+    }
+
+    public MessengerImpl(String name)
+    {
+        _name = name;
+    }
+
+    public void setTimeout(long timeInMillis)
+    {
+        _timeout = timeInMillis;
+    }
+
+    public long getTimeout()
+    {
+        return _timeout;
+    }
+
+    public void start() throws IOException
+    {
+        _driver = new DriverImpl();
+    }
+
+    public void stop()
+    {
+        //close all connections
+        for (Connector c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            connection.close();
+            try
+            {
+                c.process();
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.WARNING, "Error while sending close", e);
+            }
+        }
+        //stop listeners
+        for (Listener l : _driver.listeners())
+        {
+            try
+            {
+                l.close();
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.WARNING, "Error while closing listener", e);
+            }
+        }
+        _driver.destroy();
+    }
+
+    public void put(Message m) throws MessengerException
+    {
+        try
+        {
+            URI address = new URI(m.getAddress());
+            if (address.getHost() == null) throw new 
MessengerException("unable to send to address: " + m.getAddress());
+            int port = address.getPort() < 0 ? 
defaultPort(address.getScheme()) : address.getPort();
+            Sender sender = getLink(address.getHost(), port, new 
SenderFinder(address.getPath()));
+
+            adjustReplyTo(m);
+
+            byte[] tag = String.valueOf(_nextTag++).getBytes();
+            Delivery delivery = sender.delivery(tag);
+            int encoded;
+            while (true)
+            {
+                try
+                {
+                    encoded = m.encode(_buffer, 0, _buffer.length);
+                    break;
+                } catch (java.nio.BufferOverflowException e) {
+                    _buffer = new byte[_buffer.length*2];
+                }
+            }
+            sender.send(_buffer, 0, encoded);
+            _outgoing.add(delivery);
+            sender.advance();
+        }
+        catch (URISyntaxException e)
+        {
+            throw new MessengerException("Invalid address: " + m.getAddress(), 
e);
+        }
+    }
+
+    public void send() throws java.util.concurrent.TimeoutException
+    {
+        waitUntil(_sentSettled);
+    }
+
+    public void recv(int n) throws java.util.concurrent.TimeoutException
+    {
+        _credit += n;
+        distributeCredit();
+
+        waitUntil(_messageAvailable);
+    }
+
+    public Message get()
+    {
+        for (Connector c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            _logger.log(Level.FINE, "Attempting to get message from " + 
connection);
+            Delivery delivery = connection.getWorkHead();
+            while (delivery != null)
+            {
+                if (delivery.isReadable())
+                {
+                    _logger.log(Level.FINE, "Readable delivery found: " + 
delivery);
+                    int size = read((Receiver) delivery.getLink());
+                    Message message = new Message();
+                    message.decode(_buffer, 0, size);
+                    _incoming.add(delivery);
+                    if (_acceptMode == AcceptMode.AUTO) {
+                        _incoming.accept(incomingTracker());
+                    }
+                    _distributed--;
+                    delivery.getLink().advance();
+                    return message;
+                }
+                else
+                {
+                    _logger.log(Level.FINE, "Delivery not readable: " + 
delivery);
+                    delivery = delivery.getWorkNext();
+                }
+            }
+        }
+        return null;
+    }
+
+    public void subscribe(String source) throws MessengerException
+    {
+        //the following is not safe or accurate, but it appears '~' is
+        //invalid as the start of the hostname and URI can't handle
+        //it, so this is a quick hack to avoid rewriting the parsing
+        //logic for URLs right now...
+        boolean listen = source.contains("~");
+        try
+        {
+            URI address = new URI(listen ? source.replace("~", "") : source);
+            if (address.getHost() == null) throw new 
MessengerException("Invalid source address (hostname cannot be null): " + 
source);
+            int port = address.getPort() < 0 ? 
defaultPort(address.getScheme()) : address.getPort();
+            if (listen)
+            {
+                _driver.createListener(address.getHost(), port, null);
+            }
+            else
+            {
+                getLink(address.getHost(), port, new 
ReceiverFinder(address.getPath()));
+            }
+        }
+        catch (URISyntaxException e)
+        {
+            throw new MessengerException("Invalid source: " + source, e);
+        }
+
+    }
+
+    public int outgoing()
+    {
+        return queued(true);
+    }
+
+    public int incoming()
+    {
+        return queued(false);
+    }
+
+
+    public AcceptMode getAcceptMode()
+    {
+        return _acceptMode;
+    }
+    public void setAcceptMode(AcceptMode mode)
+    {
+        _acceptMode = mode;
+    }
+
+    public int getIncomingWindow()
+    {
+        return _incoming.getWindow();
+    }
+    public void setIncomingWindow(int window)
+    {
+        _incoming.setWindow(window);
+    }
+
+    public int getOutgoingWindow()
+    {
+        return _outgoing.getWindow();
+    }
+    public void setOutgoingWindow(int window)
+    {
+        _outgoing.setWindow(window);
+    }
+
+    public Tracker incomingTracker()
+    {
+        return new TrackerImpl(false, _incoming.getHighWaterMark() - 1);
+    }
+    public Tracker outgoingTracker()
+    {
+        return new TrackerImpl(true, _outgoing.getHighWaterMark() - 1);
+    }
+
+    private TrackerQueue getTrackerQueue(Tracker tracker)
+    {
+        return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
+    }
+    public void reject(Tracker tracker, int flags)
+    {
+        getTrackerQueue(tracker).reject(tracker, flags);
+    }
+    public void accept(Tracker tracker, int flags)
+    {
+        getTrackerQueue(tracker).accept(tracker, flags);
+    }
+    public void settle(Tracker tracker, int flags)
+    {
+        getTrackerQueue(tracker).settle(tracker, flags);
+    }
+
+    public Status getStatus(Tracker tracker)
+    {
+        return getTrackerQueue(tracker).getStatus(tracker);
+    }
+
+    private int queued(boolean outgoing)
+    {
+        int count = 0;
+        for (Connector c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            for (Link link : new Links(connection, ACTIVE, ANY))
+            {
+                if (outgoing)
+                {
+                    if (link instanceof Sender) count += link.getQueued();
+                }
+                else
+                {
+                    if (link instanceof Receiver) count += link.getQueued();
+                }
+            }
+        }
+        return count;
+    }
+
+    private int read(Receiver receiver)
+    {
+        //TODO: add pending count to Delivery?
+        int total = 0;
+        int start = 0;
+        while (true)
+        {
+            int read = receiver.recv(_buffer, start, _buffer.length - start);
+            total += read;
+            if (read == (_buffer.length - start))
+            {
+                //may need to expand the buffer (is there a better test?)
+                byte[] old = _buffer;
+                _buffer = new byte[_buffer.length*2];
+                System.arraycopy(old, 0, _buffer, 0, old.length);
+                start += read;
+            }
+            else
+            {
+                break;
+            }
+        }
+        return total;
+    }
+
+    private void process()
+    {
+        processAllConnectors();
+        processActive();
+    }
+
+    private void processAllConnectors()
+    {
+        for (Connector c : _driver.connectors())
+        {
+            try
+            {
+                c.process();
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
+        }
+    }
+
+    private void processActive()
+    {
+        //process active listeners
+        for (Listener l = _driver.listener(); l != null; l = 
_driver.listener())
+        {
+            Connector c = l.accept();
+            Connection connection = new ConnectionImpl();
+            connection.setContainer(_name);
+            c.setConnection(connection);
+            //TODO: SSL and full SASL
+            Sasl sasl = c.sasl();
+            if (sasl != null)
+            {
+                sasl.server();
+                sasl.setMechanisms(new String[]{"ANONYMOUS"});
+                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+            }
+            connection.open();
+        }
+        //process active connectors, handling opened & closed connections as 
needed
+        for (Connector c = _driver.connector(); c != null; c = 
_driver.connector())
+        {
+            _logger.log(Level.FINE, "Processing active connector " + c);
+            try
+            {
+                c.process();
+            } catch (IOException e) {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
+            Connection connection = c.getConnection();
+
+            if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+            {
+                connection.open();
+            }
+
+            Delivery delivery = connection.getWorkHead();
+            while (delivery != null)
+            {
+                if (delivery.getLink() instanceof Sender && 
delivery.isUpdated())
+                {
+                    delivery.disposition(delivery.getRemoteState());
+                }
+                //TODO: delivery.clear(); What's the equivalent in java?
+                delivery = delivery.getWorkNext();
+            }
+            _outgoing.slide();
+
+            for (Session session : new Sessions(connection, UNINIT, ANY))
+            {
+                session.open();
+                _logger.log(Level.FINE, "Opened session " + session);
+            }
+            for (Link link : new Links(connection, UNINIT, ANY))
+            {
+                //TODO: the following is not correct; should only copy those 
properties that we understand
+                link.setSource(link.getRemoteSource());
+                link.setTarget(link.getRemoteTarget());
+                link.open();
+                _logger.log(Level.FINE, "Opened link " + link);
+            }
+
+            distributeCredit();
+
+            for (Link link : new Links(connection, ACTIVE, CLOSED))
+            {
+                link.close();
+            }
+            for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+            {
+                session.close();
+            }
+            if (connection.getLocalState() == EndpointState.ACTIVE && 
connection.getRemoteState() == EndpointState.CLOSED)
+            {
+                connection.close();
+            }
+
+            if (c.isClosed())
+            {
+                reclaimCredit(connection);
+                c.destroy();
+            }
+            else
+            {
+                try
+                {
+                    c.process();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, "Error processing connection", 
e);
+                }
+            }
+        }
+    }
+
+    private void waitUntil(Predicate condition) throws TimeoutException
+    {
+        waitUntil(condition, _timeout);
+    }
+
+    private void waitUntil(Predicate condition, long timeout) throws 
TimeoutException
+    {
+        processAllConnectors();
+
+        //wait until timeout expires or until test is true
+        long deadline = timeout < 0 ? Long.MAX_VALUE : 
System.currentTimeMillis() + timeout;
+
+        boolean wait = deadline > System.currentTimeMillis();
+        boolean first = true;
+        boolean done = condition.test();
+
+        while (first || (!done && wait))
+        {
+            if (wait && !done && !first) {
+                _driver.doWait(deadline - System.currentTimeMillis());
+            }
+            processActive();
+            wait = deadline > System.currentTimeMillis();
+            done = done || condition.test();
+            first = false;
+        }
+    }
+
+    private Connection lookup(String host, String service)
+    {
+        for (Connector c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            if (host.equals(connection.getRemoteContainer()) || 
service.equals(connection.getContext()))
+            {
+                return connection;
+            }
+        }
+        return null;
+    }
+
+    private void reclaimCredit(Connection connection)
+    {
+        for (Link link : new Links(connection, ANY, ANY))
+        {
+            if (link instanceof Receiver && link.getCredit() > 0)
+            {
+                reclaimCredit(link.getCredit());
+            }
+        }
+    }
+
+    private void reclaimCredit(int credit)
+    {
+        _credit += credit;
+        _distributed -= credit;
+    }
+
+    private void distributeCredit()
+    {
+        int previous = 0;
+        while (_credit > 0 && _credit != previous)
+        {
+            previous = _credit;
+            for (Connector c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                for (Link link : new Links(connection, ACTIVE, ANY))
+                {
+                    if (link instanceof Receiver)
+                    {
+                        ((Receiver) link).flow(1);
+                        _credit--;
+                        _distributed++;
+                        if (_credit == 0) return;
+                    }
+                }
+            }
+        }
+    }
+
+    private interface Predicate
+    {
+        boolean test();
+    }
+
+    private class SentSettled implements Predicate
+    {
+        public boolean test()
+        {
+            //are all sent messages settled?
+            for (Connector c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                for (Link link : new Links(connection, ACTIVE, ACTIVE))
+                {
+                    if (link instanceof Sender)
+                    {
+                        if (link.getQueued() > 0)
+                        {
+                            return false;
+                        }
+                        //TODO: Sender.unsettled() not yet implemented, when 
it is change to the following
+                        //if (checkSettled(link.unsettled())
+                        //{
+                        //    return false;
+                        //}
+                    }
+                }
+            }
+            //TODO: Sender.unsettled() not yet implemented, when it is change 
to the following
+            //return true;
+            return checkSettled(_outgoing.deliveries());
+        }
+
+        boolean checkSettled(Iterator<Delivery> unsettled)
+        {
+            if (unsettled != null)
+            {
+                while (unsettled.hasNext())
+                {
+                    Delivery d = unsettled.next();
+                    if (d == null)
+                    {
+                        break;
+                    }
+                    if (d.getRemoteState() != null || d.remotelySettled())
+                    {
+                        d.settle();
+                    }
+                    else if 
(d.getLink().getSession().getConnection().getRemoteState() == 
EndpointState.CLOSED)
+                    {
+                        continue;
+                    }
+                    else
+                    {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+
+    private class MessageAvailable implements Predicate
+    {
+        public boolean test()
+        {
+            //do we have at least one message?
+            for (Connector c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                Delivery delivery = connection.getWorkHead();
+                while (delivery != null)
+                {
+                    if (delivery.isReadable())
+                    {
+                        //TODO: check for partial delivery?
+                        return true;
+                    }
+                    else
+                    {
+                        delivery = delivery.getWorkNext();
+                    }
+                }
+            }
+            return false;
+        }
+    }
+
+    private final SentSettled _sentSettled = new SentSettled();
+    private final MessageAvailable _messageAvailable = new MessageAvailable();
+
+    private interface LinkFinder<C extends Link>
+    {
+        C test(Link link);
+        C create(Session session);
+    }
+
+    private class SenderFinder implements LinkFinder<Sender>
+    {
+        private final String _path;
+
+        SenderFinder(String path)
+        {
+            _path = path;
+        }
+
+        public Sender test(Link link)
+        {
+            if (link instanceof Sender && matchTarget((Target) 
link.getTarget(), _path))
+            {
+                return (Sender) link;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public Sender create(Session session)
+        {
+            return session.sender(_path);
+        }
+    }
+
+    private class ReceiverFinder implements LinkFinder<Receiver>
+    {
+        private final String _path;
+
+        ReceiverFinder(String path)
+        {
+            _path = path;
+        }
+
+        public Receiver test(Link link)
+        {
+            if (link instanceof Receiver && matchSource((Source) 
link.getSource(), _path))
+            {
+                return (Receiver) link;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public Receiver create(Session session)
+        {
+            return session.receiver(_path);
+        }
+    }
+
+    private <C extends Link> C getLink(String host, int port, LinkFinder<C> 
finder)
+    {
+        String service = host + ":" + port;
+        Connection connection = lookup(host, service);
+        if (connection == null)
+        {
+            Connector connector = _driver.createConnector(host, port, null);
+            _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+            connection = new ConnectionImpl();
+            connection.setContainer(_name);
+            connection.setHostname(host);
+            connection.setContext(service);
+            connector.setConnection(connection);
+            Sasl sasl = connector.sasl();
+            if (sasl != null)
+            {
+                sasl.client();
+                sasl.setMechanisms(new String[]{"ANONYMOUS"});
+            }
+            connection.open();
+        }
+
+        for (Link link : new Links(connection, ACTIVE, ANY))
+        {
+            C result = finder.test(link);
+            if (result != null) return result;
+        }
+        Session session = connection.session();
+        session.open();
+        C link = finder.create(session);
+        link.open();
+        return link;
+    }
+
+    private static class Links implements Iterable<Link>
+    {
+        private final Connection _connection;
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+
+        Links(Connection connection, EnumSet<EndpointState> local, 
EnumSet<EndpointState> remote)
+        {
+            _connection = connection;
+            _local = local;
+            _remote = remote;
+        }
+
+        public java.util.Iterator<Link> iterator()
+        {
+            return new LinkIterator(_connection, _local, _remote);
+        }
+    }
+
+    private static class LinkIterator implements java.util.Iterator<Link>
+    {
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+        private Link _next;
+
+        LinkIterator(Connection connection, EnumSet<EndpointState> local, 
EnumSet<EndpointState> remote)
+        {
+            _local = local;
+            _remote = remote;
+            _next = connection.linkHead(_local, _remote);
+        }
+
+        public boolean hasNext()
+        {
+            return _next != null;
+        }
+
+        public Link next()
+        {
+            try
+            {
+                return _next;
+            }
+            finally
+            {
+                _next = _next.next(_local, _remote);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class Sessions implements Iterable<Session>
+    {
+        private final Connection _connection;
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+
+        Sessions(Connection connection, EnumSet<EndpointState> local, 
EnumSet<EndpointState> remote)
+        {
+            _connection = connection;
+            _local = local;
+            _remote = remote;
+        }
+
+        public java.util.Iterator<Session> iterator()
+        {
+            return new SessionIterator(_connection, _local, _remote);
+        }
+    }
+
+    private static class SessionIterator implements java.util.Iterator<Session>
+    {
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+        private Session _next;
+
+        SessionIterator(Connection connection, EnumSet<EndpointState> local, 
EnumSet<EndpointState> remote)
+        {
+            _local = local;
+            _remote = remote;
+            _next = connection.sessionHead(_local, _remote);
+        }
+
+        public boolean hasNext()
+        {
+            return _next != null;
+        }
+
+        public Session next()
+        {
+            try
+            {
+                return _next;
+            }
+            finally
+            {
+                _next = _next.next(_local, _remote);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private void adjustReplyTo(Message m)
+    {
+        String original = m.getReplyTo();
+        if (original == null || original.length() == 0)
+        {
+            m.setReplyTo("amqp://" + _name);
+        }
+        else if (original.startsWith("~/"))
+        {
+            m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
+        }
+    }
+
+    private static boolean matchTarget(Target target, String path)
+    {
+        if (target == null) return path.isEmpty();
+        else return path.equals(target.getAddress());
+    }
+
+    private static boolean matchSource(Source source, String path)
+    {
+        if (source == null) return path.isEmpty();
+        else return path.equals(source.getAddress());
+    }
+
+    private static int defaultPort(String scheme)
+    {
+        if ("amqps".equals(scheme)) return 5671;
+        else return 5672;
+    }
+}

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import org.apache.qpid.proton.messenger.Tracker;
+
+class TrackerImpl implements Tracker
+{
+    private boolean _outgoing;
+    private int _sequence;
+
+    TrackerImpl(boolean outgoing, int sequence)
+    {
+        _outgoing = outgoing;
+        _sequence = sequence;
+    }
+
+    boolean isOutgoing()
+    {
+        return _outgoing;
+    }
+
+    int getSequence()
+    {
+        return _sequence;
+    }
+
+    public String toString()
+    {
+        return (_outgoing ? "O:" : "I:") + Integer.toString(_sequence);
+    }
+}

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java?rev=1420140&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
 Tue Dec 11 13:11:00 2012
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+
+class TrackerQueue
+{
+    private static final Accepted ACCEPTED = new Accepted();
+    private static final Rejected REJECTED = new Rejected();
+    private int _window = 0;
+    private int _hwm = 0;
+    private int _lwm = 0;
+    private ArrayList<Delivery> _deliveries = new ArrayList<Delivery>();
+
+    void setWindow(int window)
+    {
+        _window = window;
+    }
+
+    int getWindow()
+    {
+        return _window;
+    }
+
+    void accept(Tracker tracker)
+    {
+        accept(tracker, 0);
+    }
+
+    void accept(Tracker tracker, int flags)
+    {
+        apply(tracker, flags, ACCEPT);
+    }
+
+    void reject(Tracker tracker, int flags)
+    {
+        apply(tracker, flags, REJECT);
+    }
+
+    void settle(Tracker tracker, int flags)
+    {
+        apply(tracker, flags, SETTLE);
+    }
+
+    void add(Delivery delivery)
+    {
+        if (delivery == null)
+        {
+            throw new NullPointerException("Cannot add null delivery!");
+        }
+        int sequence = _hwm++;
+        _deliveries.add(delivery);
+    }
+
+    Status getStatus(Tracker tracker)
+    {
+        Delivery delivery = getDelivery(tracker);
+        if (delivery != null)
+        {
+            DeliveryState state = delivery.getRemoteState();
+            if (state != null)
+            {
+                return getStatus(state);
+            }
+            else if (delivery.remotelySettled() || delivery.isSettled())
+            {
+                return getStatus(delivery.getLocalState());
+            }
+            else
+            {
+                return Status.PENDING;
+            }
+        }
+        else
+        {
+            return Status.UNKNOWN;
+        }
+
+
+    }
+
+    private Status getStatus(DeliveryState state)
+    {
+        if (state instanceof Accepted)
+        {
+            return Status.ACCEPTED;
+        }
+        else if (state instanceof Rejected)
+        {
+            return Status.REJECTED;
+        }
+        else if (state == null)
+        {
+            return Status.PENDING;
+        }
+        else
+        {
+            throw new RuntimeException("Unexpected disposition: " + state);
+        }
+    }
+
+    void slide()
+    {
+        if (_window >= 0)
+        {
+            while (_hwm - _lwm > _window)
+            {
+                if (_deliveries.isEmpty())
+                {
+                    throw new RuntimeException("Inconsistent state, empty 
delivery queue but lwm=" + _lwm + " and hwm=" + _hwm);
+                }
+                Delivery d = _deliveries.get(0);
+                if (d.getLocalState() == null)
+                {
+                    return;
+                }
+
+                d.settle();
+                _deliveries.remove(0);
+                _lwm++;
+            }
+        }
+    }
+
+    int getHighWaterMark()
+    {
+        return _hwm;
+    }
+
+    Iterator<Delivery> deliveries()
+    {
+        return _deliveries.iterator();
+    }
+
+    private Delivery getDelivery(Tracker tracker)
+    {
+        int seq = ((TrackerImpl) tracker).getSequence();
+        if (seq < _lwm || seq > _hwm) return null;
+        int index = seq - _lwm;
+        return index < _deliveries.size() ? _deliveries.get(index) : null;
+    }
+
+    static boolean isOutgoing(Tracker tracker)
+    {
+        return ((TrackerImpl) tracker).isOutgoing();
+    }
+
+    private void apply(Tracker tracker, int flags, DeliveryOperation operation)
+    {
+        int seq = ((TrackerImpl) tracker).getSequence();
+        if (seq < _lwm || seq > _hwm) return;
+        int last = seq - _lwm;
+        int start = (flags & Messenger.CUMULATIVE) != 0 ? 0 : last;
+        for (int i = start; i <= last && i < _deliveries.size(); ++i)
+        {
+            Delivery d = _deliveries.get(i);
+            if (d != null && !d.isSettled())
+            {
+                operation.apply(d);
+            }
+        }
+        slide();
+    }
+
+    private static interface DeliveryOperation
+    {
+        void apply(Delivery d);
+    }
+
+    private static final DeliveryOperation ACCEPT = new DeliveryOperation()
+        {
+            public void apply(Delivery d)
+            {
+                if (d.getLocalState() == null) d.disposition(ACCEPTED);
+            }
+        };
+    private static final DeliveryOperation REJECT = new DeliveryOperation()
+        {
+            public void apply(Delivery d)
+            {
+                if (d.getLocalState() == null) d.disposition(REJECTED);
+            }
+        };
+    private static final DeliveryOperation SETTLE = new DeliveryOperation()
+        {
+            public void apply(Delivery d)
+            {
+                if (d.getLocalState() == null)
+                {
+                    d.disposition(d.getRemoteState());
+                }
+                d.settle();
+            }
+        };
+}

Modified: qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py?rev=1420140&r1=1420139&r2=1420140&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py Tue Dec 11 
13:11:00 2012
@@ -22,7 +22,9 @@ from org.apache.qpid.proton.engine.impl 
     SenderImpl, ReceiverImpl, TransportImpl
 from org.apache.qpid.proton.message import Message as MessageImpl, \
     MessageFormat
-from org.apache.qpid.proton.type.messaging import Source, Target, Accepted
+from org.apache.qpid.proton.messenger import AcceptMode, MessengerException, 
Status
+from org.apache.qpid.proton.messenger.impl import MessengerImpl
+from org.apache.qpid.proton.type.messaging import Source, Target, Accepted, 
AmqpValue
 from org.apache.qpid.proton.type import UnsignedInteger
 from jarray import zeros
 from java.util import EnumSet, UUID as JUUID
@@ -32,6 +34,29 @@ class Skipped(Exception):
 
 PN_SESSION_WINDOW = TransportImpl.SESSION_WINDOW
 
+PENDING = "PENDING"
+ACCEPTED = "ACCEPTED"
+REJECTED = "REJECTED"
+
+STATUSES = {
+  Status.ACCEPTED: ACCEPTED,
+  Status.REJECTED: REJECTED,
+  Status.PENDING: PENDING,
+  Status.UNKNOWN: None
+  }
+
+MANUAL = "MANUAL"
+AUTOMATIC = "AUTOMATIC"
+
+_ACCEPT_MODE2CONST = {
+  AcceptMode.AUTO: AUTOMATIC,
+  AcceptMode.MANUAL: MANUAL
+  }
+_CONST2ACCEPT_MODE = {
+  AUTOMATIC: AcceptMode.AUTO,
+  MANUAL: AcceptMode.MANUAL
+  }
+
 class Endpoint(object):
 
   LOCAL_UNINIT = 1
@@ -228,7 +253,7 @@ class Link(Endpoint):
     return wrap_session(self.impl.getSession())
 
   def delivery(self, tag):
-    return wrap_delivery(self.impl.delivery(tag, 0, len(tag)))
+    return wrap_delivery(self.impl.delivery(tag))
 
   @property
   def current(self):
@@ -480,10 +505,95 @@ class Data(object):
   def __init__(self, *args, **kwargs):
     raise Skipped()
 
+class Timeout(Exception):
+  pass
+
 class Messenger(object):
 
   def __init__(self, *args, **kwargs):
-    raise Skipped()
+    #raise Skipped()
+    self.impl = MessengerImpl()
+
+  def start(self):
+    self.impl.start()
+
+  def stop(self):
+    self.impl.stop()
+
+  def subscribe(self, source):
+    self.impl.subscribe(source)
+
+  def put(self, message):
+    self.impl.put(message.impl)
+    return self.impl.outgoingTracker()
+
+  def send(self):
+    self.impl.send()
+
+  def recv(self, n):
+    self.impl.recv(n)
+
+  def get(self, message=None):
+    if message is None:
+      self.impl.get()
+    else:
+      message.impl = self.impl.get()
+    return self.impl.incomingTracker()
+
+  @property
+  def outgoing(self):
+    return self.impl.outgoing()
+
+  @property
+  def incoming(self):
+    return self.impl.incoming()
+
+  def _get_accept_mode(self):
+    return _ACCEPT_MODE2CONST(self.impl.getAcceptMode())
+  def _set_accept_mode(self, mode):
+    mode = _CONST2ACCEPT_MODE[mode]
+    self.impl.setAcceptMode(mode)
+  accept_mode = property(_get_accept_mode, _set_accept_mode)
+
+  def accept(self, tracker=None):
+    if tracker is None:
+      tracker = self.impl.incomingTracker()
+      flags = self.impl.CUMULATIVE
+    else:
+      flags = 0
+    self.impl.accept(tracker, flags)
+
+  def reject(self, tracker=None):
+    if tracker is None:
+      tracker = self.impl.incomingTracker()
+      flags = self.impl.CUMULATIVE
+    else:
+      flags = 0
+    self.impl.reject(tracker, flags)
+
+  def settle(self, tracker=None):
+    if tracker is None:
+      tracker = self.impl.outgoingTracker()
+      flags = self.impl.CUMULATIVE
+    else:
+      flags = 0
+    self.impl.settle(tracker, flags)
+
+  def status(self, tracker):
+    return STATUSES[self.impl.getStatus(tracker)]
+
+  def _get_incoming_window(self):
+    return self.impl.getIncomingWindow()
+  def _set_incoming_window(self, window):
+    self.impl.setIncomingWindow(window)
+  incoming_window = property(_get_incoming_window, _set_incoming_window)
+
+  def _get_outgoing_window(self):
+    return self.impl.getOutgoingWindow()
+  def _set_outgoing_window(self, window):
+    self.impl.setOutgoingWindow(window)
+  outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+
 
 class Message(object):
 
@@ -651,6 +761,17 @@ class Message(object):
     self.impl.setMessageFormat(format)
   format = property(_get_format, _set_format)
 
+  def _get_body(self):
+    body = self.impl.getBody()
+    if isinstance(body, AmqpValue):
+      return body.getValue()
+    else:
+      return body
+  def _set_body(self, body):
+    self.impl.setBody(AmqpValue(body))
+  body = property(_get_body, _set_body)
+
+
 class SASL(object):
 
   OK = Sasl.PN_SASL_OK
@@ -742,4 +863,6 @@ __all__ = ["Messenger", "Message", "Prot
            "MessageException", "Timeout", "Condition", "Data", "Endpoint",
            "Connection", "Session", "Link", "Terminus", "Sender", "Receiver",
            "Delivery", "Transport", "TransportException", "SASL", "SSL",
-           "SSLException", "SSLUnavailable", "PN_SESSION_WINDOW", "symbol"]
+           "SSLException", "SSLUnavailable", "PN_SESSION_WINDOW", "symbol",
+           "MANUAL", "PENDING", "ACCEPTED", "REJECTED"]
+



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to