Author: kgiusti
Date: Thu Jun 14 21:05:59 2012
New Revision: 1350403

URL: http://svn.apache.org/viewvc?rev=1350403&view=rev
Log:
Add mailbox example

Added:
    qpid/proton/trunk/examples/README.txt   (with props)
    qpid/proton/trunk/examples/mailbox/
    qpid/proton/trunk/examples/mailbox/README.txt   (with props)
    qpid/proton/trunk/examples/mailbox/fetch   (with props)
    qpid/proton/trunk/examples/mailbox/post   (with props)
    qpid/proton/trunk/examples/mailbox/server   (with props)

Added: qpid/proton/trunk/examples/README.txt
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/README.txt?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/README.txt (added)
+++ qpid/proton/trunk/examples/README.txt Thu Jun 14 21:05:59 2012
@@ -0,0 +1,3 @@
+This directory contains example applications that use the Proton library.
+
+mailbox/ - a toy mailbox client/server example

Propchange: qpid/proton/trunk/examples/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/proton/trunk/examples/mailbox/README.txt
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/README.txt?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/README.txt (added)
+++ qpid/proton/trunk/examples/mailbox/README.txt Thu Jun 14 21:05:59 2012
@@ -0,0 +1,41 @@
+This directory contains an example client/server application that uses the 
Proton library.
+The applications implement a simple mailbox server.  Clients "post" and 
"fetch" messages
+to named mailboxes on the server.
+
+Files:
+
+    server - the mailbox server.  This server listens to client requests on a 
well know
+    address.  Clients may request to post a message to a mailbox, or fetch a 
message from
+    a mailbox.  When a message is posted, if the mailbox does not exist it is 
created and
+    the message is stored in it.  Should additional messages arrive for the 
mailbox, they
+    are queued in the order of arrival.  When a mailbox is fetched, the next 
(oldest)
+    message in the mailbox is removed from the mailbox and sent to the client. 
 If a
+    client attempts to fetch from a non-existent mailbox, a zero-length 
message is
+    returned.
+
+    post - a client that sends a message to a mailbox on the server.
+
+    fetch - a client that retrieves a message from a mailbox on the server.
+
+To run the example:
+
+    1) Start the server application.  You may specify the address the server 
should listen
+    on.  The default address is 0.0.0.0:5672.  The server application should 
be left
+    running for the following steps.
+
+    2) Post a message to the server using the 'post' application.  For 
example, the
+    following command would post the message "Hello World" to the mailbox 
"Mailbox-1" on
+    server 0.0.0.0:5672 :
+
+              post -m Mailbox-1 "Hello World"
+
+    use the --help option for additional details.
+
+    3) Fetch a message from the server using the 'fetch' application.  For 
example, the
+    following command would fetch the message sent in the previous step:
+
+              fetch Mailbox-1
+
+    use the --help option for additional details.
+
+    Once you are done running the example, you may stop the server application.

Propchange: qpid/proton/trunk/examples/mailbox/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/proton/trunk/examples/mailbox/fetch
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/fetch?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/fetch (added)
+++ qpid/proton/trunk/examples/mailbox/fetch Thu Jun 14 21:05:59 2012
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+class Options(object):
+    def __init__(self):
+        parser = OptionParser(usage="usage: %prog [options] <mailbox>")
+        parser.add_option("-s", "--server", action="store", type="string",
+                          default="0.0.0.0:5672", metavar="<server-address>",
+                          help="Address of the server with syntax: hostname | 
ip-address [:<port>]")
+        parser.add_option("-c", "--count", action="store", type="int",
+                          default=1, metavar="<#messages>",
+                          help="Number of messages to read from mailbox")
+        parser.add_option("-v", "--verbose", action="store_true",
+                          help="Turn on extra trace messages.")
+
+        opts, mailboxes = parser.parse_args()   # uses sys.argv[1:]
+
+        self.mailbox = None
+        if len(mailboxes) == 1:
+            self.mailbox = str(mailboxes[0])
+        self.server = opts.server
+        addr = opts.server.rsplit(":", 1)
+        self.host = addr[0]
+        if len(addr) == 2:
+            self.port = addr[1]
+        else:
+            self.port = "5672"
+        self.count = opts.count
+        self.verbose = opts.verbose
+
+
+class FetchClient(object):
+    def __init__(self, host, port, mailbox):
+        """ Initialize the client by supplying the address of the server, and
+        the name of the mailbox to fetch from.
+        """
+        self.host = host
+        self.port = port
+        self.mailbox = mailbox
+        self.logging = False
+
+    def setup(self):
+        """ Setup and configure the connection to the server.
+        """
+        # setup a driver connection to the server
+
+        self.log("Connecting to server host = %s:%s" % (self.host, self.port))
+        self.driver = pn_driver();
+        self.cxtr = pn_connector(self.driver, self.host, self.port, None)
+
+        # configure SASL
+        self.sasl = pn_connector_sasl(self.cxtr)
+        pn_sasl_mechanisms(self.sasl, "ANONYMOUS")
+        pn_sasl_client(self.sasl)
+
+        # inform the engine about the connection, and link the driver to it.
+        self.conn = pn_connection()
+        pn_connector_set_connection(self.cxtr, self.conn)
+
+        # create a session, and Link for receiving from the mailbox
+        self.log("Fetching from mailbox = %s" % self.mailbox)
+        self.ssn = pn_session(self.conn)
+        self.link = pn_receiver(self.ssn, "receiver")
+        pn_set_source(self.link, self.mailbox)
+
+        # now open all the engine endpoints
+        pn_connection_open(self.conn)
+        pn_session_open(self.ssn)
+        pn_link_open(self.link)
+
+
+    def wait(self):
+        """ Wait for an event to process.
+        """
+        self.log("Waiting for events...")
+
+        # prepare pending outbound data for the network
+        pn_connector_process(self.cxtr)
+
+        # wait forever for network event(s)
+        pn_driver_wait(self.driver, -1)
+
+        # process any data that arrived
+        pn_connector_process(self.cxtr)
+
+        self.log("...waiting done!")
+
+
+    def settle(self):
+        """ In order to be sure that the remote has seen that we accepted the
+        message, we need to wait until the message's delivery has been remotely
+        settled.  Once that occurs, we can release the delivery by settling it.
+        """
+        # locally settle any remotely settled deliveries
+        d = pn_unsettled_head(self.link)
+        while d and not pn_readable(d):    # can stop when we hit first
+            # delivery that has not yet been read
+            _next = pn_unsettled_next(d)
+            if pn_remote_settled(d):   # remote has seen our ack (or doesn't 
care)
+                pn_settle(d)   # now free it
+            d = _next
+
+
+    def enableLogging(self):
+        self.logging = True
+
+
+    def log(self, msg):
+        if self.logging:
+            print("%s" % msg)
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+    options = Options()
+
+    if not options.mailbox:
+        print("No mailbox name given!")
+        return -1
+
+    receiver = FetchClient(options.host,
+                           options.port,
+                           options.mailbox)
+    if options.verbose:
+        receiver.enableLogging()
+
+    receiver.setup()
+
+    # wait until we authenticate with the server
+    while pn_sasl_state(receiver.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL):
+        receiver.wait()
+
+    if pn_sasl_state(receiver.sasl) == PN_SASL_FAIL:
+        print("Error: Authentication failure")
+        return -1
+
+    # wait until the server has opened the connection
+    while not (pn_link_state(receiver.link) & PN_REMOTE_ACTIVE):
+        receiver.wait()
+
+    # check if the server recognizes the mailbox, fail if it does not
+    if pn_remote_source(receiver.link) != options.mailbox:
+        print("Error: mailbox %s does not exist!" % options.mailbox)
+        return -2
+
+    # Allow the server to send "count" messages to the receiver by setting
+    # the credit to the expected count
+    pn_flow(receiver.link, options.count)
+
+    # main loop: continue fetching messages until 'count' messages have been
+    # retrieved
+
+    while pn_credit(receiver.link) > 0:    # while all msgs have not arrived
+        if pn_queued(receiver.link) == 0:  # wait for some to arrive
+            receiver.wait()
+
+        # read all queued deliveries
+        while pn_queued(receiver.link):
+            delivery = pn_current(receiver.link)
+            # read all bytes of message
+            rc, msg = pn_recv(receiver.link, pn_pending(delivery))
+            receiver.log("Received count/status=%d" % rc)
+            if rc < 0:
+                print("Error: Receive failed (%s), exiting..." % rc)
+                return -3
+            print("%s" % msg)
+            # let the server know we accept the message
+            pn_disposition(delivery, PN_ACCEPTED)
+            pn_advance(receiver.link)      # next delivery
+
+        # settle any deliveries that the server has seen
+        receiver.settle()
+
+    # block until any leftover deliveries are settled
+    while pn_unsettled(receiver.link) > 0:
+        receiver.wait()
+        receiver.settle()
+
+    # we're done, close and wait for the remote to close also
+    pn_connection_close(receiver.conn)
+    while not (pn_connection_state(receiver.conn) & PN_REMOTE_CLOSED):
+        receiver.wait()
+    return 0
+
+
+if __name__ == "__main__":
+        sys.exit(main())
+
+
+
+
+
+

Propchange: qpid/proton/trunk/examples/mailbox/fetch
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/trunk/examples/mailbox/post
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/post?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/post (added)
+++ qpid/proton/trunk/examples/mailbox/post Thu Jun 14 21:05:59 2012
@@ -0,0 +1,207 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+
+class Options(object):
+    def __init__(self):
+        parser = OptionParser(usage="usage: %prog [options] <message-string> 
[<message-string> ...]")
+        parser.add_option("-s", "--server", action="store", type="string",
+                          default="0.0.0.0:5672", metavar="<server-address>",
+                          help="Address of the server with syntax: hostname | 
ip-address [:<port>]")
+        parser.add_option("-m", "--mailbox", action="store", type="string",
+                          default="mailbox", metavar="<mailbox-address>",
+                          help="Name of the mailbox on the server.")
+        parser.add_option("-v", "--verbose", action="store_true",
+                          help="Turn on extra trace messages.")
+
+        opts, self.messages = parser.parse_args()   # uses sys.argv[1:]
+
+        self.server = opts.server
+        self.mailbox = opts.mailbox
+
+        addr = opts.server.rsplit(":", 1)
+        self.host = addr[0]
+        if len(addr) == 2:
+            self.port = addr[1]
+        else:
+            self.port = "5672"
+        self.verbose = opts.verbose
+
+
+class PostClient(object):
+    def __init__(self, host, port, mailbox):
+        """ Initialize the client by supplying the address of the server, and
+        the name of the mailbox to post to.
+        """
+        self.host = host
+        self.port = port
+        self.mailbox = mailbox
+        self.logging = False
+
+    def setup(self):
+        """ Setup and configure the connection to the server.
+        """
+        # setup a driver connection to the server
+
+        self.log("Connecting to server host = %s:%s" % (self.host, self.port))
+        self.driver = pn_driver();
+        self.cxtr = pn_connector(self.driver, self.host, self.port, None)
+
+        # configure SASL
+        self.sasl = pn_connector_sasl(self.cxtr)
+        pn_sasl_mechanisms(self.sasl, "ANONYMOUS")
+        pn_sasl_client(self.sasl)
+
+        # inform the engine about the connection, and link the driver to it.
+        self.conn = pn_connection()
+        pn_connector_set_connection(self.cxtr, self.conn)
+
+        # create a session, and Link for receiving from the mailbox
+        self.log("Posting to mailbox = %s" % self.mailbox)
+        self.ssn = pn_session(self.conn)
+        self.link = pn_sender(self.ssn, "sender")
+        pn_set_target(self.link, self.mailbox)
+
+        # now open all the engine endpoints
+        pn_connection_open(self.conn)
+        pn_session_open(self.ssn)
+        pn_link_open(self.link)
+
+
+    def wait(self):
+        """ Wait for an event to process.
+        """
+        self.log("Waiting for events...")
+
+        # prepare pending outbound data for the network
+        pn_connector_process(self.cxtr)
+
+        # wait forever for network event(s)
+        pn_driver_wait(self.driver, -1)
+
+        # process any data that arrived
+        pn_connector_process(self.cxtr)
+
+        self.log("...waiting done!")
+
+
+    def settle(self):
+        """ In order to be sure that the remote has accepted the message, we
+        need to wait until the message's delivery has been remotely settled.
+        Once that occurs, we can release the delivery by settling it.
+        """
+        d = pn_unsettled_head(self.link)
+        while d:
+            _next = pn_unsettled_next(d)
+            # if the remote has either settled this delivery OR set the
+            # disposition, we consider the message received.
+            disp = pn_remote_disp(d)
+            if disp and disp != PN_ACCEPTED:
+                print("Warning: message was not accepted by the remote!")
+            if disp or pn_remote_settled(d):
+                pn_settle(d)
+            d = _next
+
+
+    def enableLogging(self):
+        self.logging = True
+
+
+    def log(self, msg):
+        if self.logging:
+            print("%s" % msg)
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+    options = Options()
+
+    if len(options.messages) == 0:
+        print("No message data given!")
+        return -1
+
+    sender = PostClient(options.host,
+                        options.port,
+                        options.mailbox)
+    if options.verbose:
+        sender.enableLogging()
+
+    sender.setup()
+
+    # wait until we authenticate with the server
+    while pn_sasl_state(sender.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL):
+        sender.wait()
+
+    if pn_sasl_state(sender.sasl) == PN_SASL_FAIL:
+        print("Error: Authentication failure")
+        return -1
+
+    # main loop: send each message
+
+    pendingSends = list(options.messages)
+    while pendingSends:
+        # wait until the server grands us some send credit
+        if pn_credit(sender.link) == 0:
+            sender.log("wait for credit")
+            sender.wait()
+
+        while pn_credit(sender.link) > 0:
+            msg = pendingSends.pop(0)
+            sender.log("sending %s" % msg)
+            d = pn_delivery(sender.link, "post-delivery-%s" % 
len(pendingSends))
+            rc = pn_send(sender.link, msg)
+            if (rc < 0):
+                print("Error: sending message: %s" % rc)
+                return -2
+            assert rc == len(msg)
+            pn_advance(sender.link)  # deliver the message
+
+        # settle any deliveries that the server has accepted
+        sender.settle()
+
+    # done sending, now block until any pending deliveries are settled
+    while pn_unsettled(sender.link) > 0:
+        sender.wait()
+        sender.settle()
+
+    # we're done, close and wait for the remote to close also
+    pn_connection_close(sender.conn)
+    while not (pn_connection_state(sender.conn) & PN_REMOTE_CLOSED):
+        sender.wait()
+    return 0
+
+
+if __name__ == "__main__":
+        sys.exit(main())
+
+
+
+
+
+

Propchange: qpid/proton/trunk/examples/mailbox/post
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/trunk/examples/mailbox/server
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/server?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/server (added)
+++ qpid/proton/trunk/examples/mailbox/server Thu Jun 14 21:05:59 2012
@@ -0,0 +1,373 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+
+FAILED = 0
+CONNECTION_UP = 1
+AUTHENTICATING = 2
+
+counter = 0
+mailboxes = {}
+
+
+class Options(object):
+    def __init__(self):
+        parser = OptionParser(usage="usage: %prog [options] <server-address>")
+        parser.add_option("-v", "--verbose",
+                          action="store_false", dest="verbose", default=True,
+                          help="print status messages to stdout")
+
+        opts, self.server = parser.parse_args()   # uses sys.argv[1:]
+        self.verbose = opts.verbose
+
+        if self.server:
+            addr = self.server[0].rsplit(":", 1)
+            self.host = addr[0]
+            if len(addr) == 2:
+                self.port = addr[1]
+            else:
+                self.port = "5672"
+        else:
+            self.host = "0.0.0.0"
+            self.port = "5672"
+
+
+class MailboxServer(object):
+    def __init__(self, host, port):
+        """ Initialize the server to wait on the given address for inbound
+        connection requests.
+        """
+        self.host = host
+        self.port = port
+        self.mailboxes = {}
+        self.verbose = False
+        self.counter = 0
+
+
+    def setup(self):
+        """ Setup and configure the server
+        """
+        self.log("Server started, listening on %s:%s" % (self.host, self.port))
+        self.driver = pn_driver();
+        self.listener = pn_listener(self.driver, self.host, self.port, None)
+        if self.listener is None:
+            print("Error: could not listen on %s:%s" % (self.host, self.port))
+            return False
+        return True
+
+
+    def wait(self):
+        """ Wait for a network event from the driver
+        """
+        self.log("Driver sleep...")
+        pn_driver_wait(self.driver, -1)
+        self.log("...Driver wakeup.")
+
+
+    def acceptConnectionRequests(self):
+        """ Accept connection request coming from remote clients.  Create a
+        connector to track each accepted network connection.
+        """
+        l = pn_driver_listener(self.driver)
+        while l:
+            self.log("Accepting Connection.")
+            cxtr = pn_listener_accept(l)
+            pn_connector_set_context(cxtr, AUTHENTICATING)
+            l = pn_driver_listener(self.driver)
+
+
+    def processConnections(self):
+        """ Check each connector for pending work.
+        """
+        cxtr = pn_driver_connector(self.driver)
+        while cxtr:
+            self.log("Process Connector")
+
+            # process any data coming from the network, this will update the
+            # engine's view of the state of the remote clients
+            pn_connector_process(cxtr)
+
+            state = pn_connector_context(cxtr)
+            if state == AUTHENTICATING:
+                # connection has not passed SASL authentication yet
+                self.authenticateConnector(cxtr)
+            elif state == CONNECTION_UP:
+                # active connection, service any engine events
+                self.serviceConnector(cxtr)
+            else:
+                print("Error: Unknown Connection state=%s" % state)
+
+            # now generate any outbound network data generated in reponse to
+            # any work done by the engine.
+            pn_connector_process(cxtr)
+
+            if pn_connector_closed(cxtr):
+                self.log("Closing connector")
+                pn_connector_destroy(cxtr)
+
+            cxtr = pn_driver_connector(self.driver)
+
+
+    def enableLogging(self):
+        self.logging = True
+
+
+    def log(self, msg):
+        if self.logging:
+            print("%s" % msg)
+
+
+    def authenticateConnector(self, cxtr):
+        """ Authenticate the remote client.  Run the SASL algorithm until it
+        passes, fails, or needs more data from the remote.
+        """
+        self.log("Authenticating...")
+        sasl = pn_connector_sasl(cxtr)
+        state = pn_sasl_state(sasl)
+        while state == PN_SASL_CONF or state == PN_SASL_STEP:
+            if state == PN_SASL_CONF:
+                self.log("Authenticating-CONF...")
+                pn_sasl_mechanisms(sasl, "ANONYMOUS")
+                pn_sasl_server(sasl)
+            elif state == PN_SASL_STEP:
+                self.log("Authenticating-STEP...")
+                mech = pn_sasl_remote_mechanisms(sasl)
+                if mech == "ANONYMOUS":
+                    pn_sasl_done(sasl, PN_SASL_OK)
+                else:
+                    pn_sasl_done(sasl, PN_SASL_AUTH)
+            state = pn_sasl_state(sasl)
+
+        if state == PN_SASL_PASS:
+            pn_connector_set_connection(cxtr, pn_connection());
+            pn_connector_set_context(cxtr, CONNECTION_UP)
+            self.log("Authentication-PASSED")
+        elif state == PN_SASL_FAIL:
+            pn_connector_set_context(cxtr, FAILED)
+            self.log("Authentication-FAILED")
+        else:
+            self.log("Authentication-PENDING")
+
+
+    def setupLink(self, link):
+        """ Configure a link coming from a client.
+        """
+        r_tgt = pn_remote_target(link)
+        r_src = pn_remote_source(link)
+
+        if pn_is_sender(link):
+            self.log("Opening Link to read from mailbox: %s" % r_src)
+            if r_src not in self.mailboxes:
+                print("Error: mailbox %s does not exist!" % r_src)
+                r_src = None  # indicate to remote the mailbox does not exist
+        else:
+            self.log("Opening Link to write to mailbox: %s" % r_tgt)
+            if r_tgt not in self.mailboxes:
+                self.mailboxes[r_tgt] = []     # create a new mailbox
+
+        pn_set_target(link, r_tgt)
+        pn_set_source(link, r_src)
+
+        if pn_is_sender(link):
+            # grant a delivery to the link - it will become "writable" when the
+            # driver can accept messages for the sender.
+            pn_delivery(link, "server-delivery-%d" % self.counter)
+            self.counter += 1
+        else:
+            # Grant enough credit to the receiver to allow one inbound message
+            pn_flow(link, 1)
+
+        pn_link_open(link)
+
+
+    def serviceConnector(self, cxtr):
+        """ Process any pending I/O events on the given connector once it has 
been
+        authenticated.
+        """
+        self.log("I/O processing start.")
+
+        # get the engine's connection from the driver
+        conn = pn_connector_connection(cxtr)
+
+        ## Step 1: setup the engine's connection, and any sessions and links
+        ## that may be pending.
+
+        # initialize the connection if it's new
+        if pn_connection_state(conn) & PN_LOCAL_UNINIT:
+            self.log("Connection Opened.")
+            pn_connection_open(conn)
+
+        # open all pending sessions
+        ssn = pn_session_head(conn, PN_LOCAL_UNINIT)
+        while ssn:
+            pn_session_open(ssn)
+            self.log("Session Opened.")
+            ssn = pn_session_next(ssn, PN_LOCAL_UNINIT)
+
+        # configure and open any pending links
+        link = pn_link_head(conn, PN_LOCAL_UNINIT);
+        while link:
+            self.setupLink(link)
+            link = pn_link_next(link, PN_LOCAL_UNINIT);
+
+        ## Step 2: Now drain all the pending deliveries from the connection's
+        ## work queue and process them
+
+        delivery = pn_work_head(conn)
+        while delivery:
+            self.log("Process delivery %s." % pn_delivery_tag(delivery))
+
+            if pn_readable(delivery):   # inbound data available
+                self.processReceive(delivery)
+            elif pn_writable(delivery): # can send a message
+                self.sendMessage(delivery)
+
+            if pn_updated(delivery):
+                # check to see if the remote has accepted message we sent
+                self.log("remote disposition for %s: %s " % 
(str(pn_delivery_tag(delivery)),
+                                                             
str(pn_remote_disp(delivery))))
+                if pn_remote_disp(delivery):
+                    # once we know the remote has seen the message, we can
+                    # release the delivery.
+                    pn_settle(delivery)
+
+            delivery = pn_work_next(delivery)
+
+        ## Step 3: Clean up any links or sessions that have been closed by the
+        ## remote.  If the connection has been closed remotely, clean that up
+        ## also.
+
+        # teardown any terminating links
+        link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+        while link:
+            pn_link_close(link)
+            self.log("Link Closed")
+            link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+
+        # teardown any terminating sessions
+        ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+        while ssn:
+            pn_session_close(ssn)
+            self.log("Session Closed")
+            ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+
+        # teardown the connection if it's terminating
+        if pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED):
+            self.log("Connection Closed")
+            pn_connection_close(conn)
+
+
+    def processReceive(self, delivery):
+        """ A delivery has indicated that message data is available.  Read the
+        message data, process it, then accept the message.
+        """
+        link = pn_link(delivery)
+        mbox = pn_remote_target(link)
+        rc, msg = pn_recv(link, 1024);
+        self.log("Msg Received %d" % rc);
+        while rc >= 0:
+            if mbox not in self.mailboxes:
+                print("Error: cannot sent to mailbox %s - dropping message." % 
mbox)
+            else:
+                self.mailboxes[mbox].append(msg)
+                self.log("Mailbox %s contains: %s" % (mbox, 
str(self.mailboxes[mbox])))
+            rc, msg = pn_recv(link, 1024);
+
+        # now that we hit the end of the message, update the
+        # disposition to let the remote know we accepted it.
+        self.log("Msg Accepted.");
+        pn_disposition(delivery, PN_ACCEPTED)
+
+        # since we no longer have any more work to do on this delivery, finish
+        # it and move to the next.
+        pn_settle(delivery)
+        pn_advance(link)
+
+        # if more credit is needed, grant it
+        if pn_credit(link) == 0:
+            pn_flow(link, 1)
+
+
+    def sendMessage(self, delivery):
+        """ A delivery has indicated that the link is able to accept a
+        message.  Send a message over the link, but do not settle it until the
+        remote accepts it (and updates the delivery's disposition).
+        """
+        link = pn_link(delivery)
+        mbox = pn_remote_source(link)
+        self.log("Request for Mailbox=%s" % str(mbox))
+        if mbox in self.mailboxes and self.mailboxes[mbox]:
+            msg = self.mailboxes[mbox].pop(0)
+            self.log("Fetching message %s" % str(msg))
+        else:
+            print("Warning: mailbox %s is empty, sending empty message." % 
mbox)
+            msg = ""
+        sent = pn_send(link, msg)
+        assert(sent == len(msg))
+        self.log("Msg Sent %d" % sent);
+
+        # if the link can accept more ???RAFI, is that correct????, grant
+        # another delivery
+        if pn_advance(link):
+            pn_delivery(link, "server-delivery-%d" % self.counter)
+            self.counter += 1
+
+        # do not settle the delivery now - wait until the remote sets the 
disposition.
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+    options = Options()
+
+    server = MailboxServer(options.host, options.port)
+    if (options.verbose):
+        server.enableLogging()
+
+    if not server.setup():
+        return -1;
+
+    while True:
+        # wait for a driver event
+        server.wait()
+
+        # accept all pending connection requests
+        server.acceptConnectionRequests()
+
+        # process all connectors with pending events
+        server.processConnections()
+
+
+if __name__ == "__main__":
+        sys.exit(main())
+
+
+
+
+
+

Propchange: qpid/proton/trunk/examples/mailbox/server
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to