Author: kgiusti
Date: Thu Jun 14 21:05:59 2012
New Revision: 1350403
URL: http://svn.apache.org/viewvc?rev=1350403&view=rev
Log:
Add mailbox example
Added:
qpid/proton/trunk/examples/README.txt (with props)
qpid/proton/trunk/examples/mailbox/
qpid/proton/trunk/examples/mailbox/README.txt (with props)
qpid/proton/trunk/examples/mailbox/fetch (with props)
qpid/proton/trunk/examples/mailbox/post (with props)
qpid/proton/trunk/examples/mailbox/server (with props)
Added: qpid/proton/trunk/examples/README.txt
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/README.txt?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/README.txt (added)
+++ qpid/proton/trunk/examples/README.txt Thu Jun 14 21:05:59 2012
@@ -0,0 +1,3 @@
+This directory contains example applications that use the Proton library.
+
+mailbox/ - a toy mailbox client/server example
Propchange: qpid/proton/trunk/examples/README.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/proton/trunk/examples/mailbox/README.txt
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/README.txt?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/README.txt (added)
+++ qpid/proton/trunk/examples/mailbox/README.txt Thu Jun 14 21:05:59 2012
@@ -0,0 +1,41 @@
+This directory contains an example client/server application that uses the
Proton library.
+The applications implement a simple mailbox server. Clients "post" and
"fetch" messages
+to named mailboxes on the server.
+
+Files:
+
+ server - the mailbox server. This server listens to client requests on a
well know
+ address. Clients may request to post a message to a mailbox, or fetch a
message from
+ a mailbox. When a message is posted, if the mailbox does not exist it is
created and
+ the message is stored in it. Should additional messages arrive for the
mailbox, they
+ are queued in the order of arrival. When a mailbox is fetched, the next
(oldest)
+ message in the mailbox is removed from the mailbox and sent to the client.
If a
+ client attempts to fetch from a non-existent mailbox, a zero-length
message is
+ returned.
+
+ post - a client that sends a message to a mailbox on the server.
+
+ fetch - a client that retrieves a message from a mailbox on the server.
+
+To run the example:
+
+ 1) Start the server application. You may specify the address the server
should listen
+ on. The default address is 0.0.0.0:5672. The server application should
be left
+ running for the following steps.
+
+ 2) Post a message to the server using the 'post' application. For
example, the
+ following command would post the message "Hello World" to the mailbox
"Mailbox-1" on
+ server 0.0.0.0:5672 :
+
+ post -m Mailbox-1 "Hello World"
+
+ use the --help option for additional details.
+
+ 3) Fetch a message from the server using the 'fetch' application. For
example, the
+ following command would fetch the message sent in the previous step:
+
+ fetch Mailbox-1
+
+ use the --help option for additional details.
+
+ Once you are done running the example, you may stop the server application.
Propchange: qpid/proton/trunk/examples/mailbox/README.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/proton/trunk/examples/mailbox/fetch
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/fetch?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/fetch (added)
+++ qpid/proton/trunk/examples/mailbox/fetch Thu Jun 14 21:05:59 2012
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+class Options(object):
+ def __init__(self):
+ parser = OptionParser(usage="usage: %prog [options] <mailbox>")
+ parser.add_option("-s", "--server", action="store", type="string",
+ default="0.0.0.0:5672", metavar="<server-address>",
+ help="Address of the server with syntax: hostname |
ip-address [:<port>]")
+ parser.add_option("-c", "--count", action="store", type="int",
+ default=1, metavar="<#messages>",
+ help="Number of messages to read from mailbox")
+ parser.add_option("-v", "--verbose", action="store_true",
+ help="Turn on extra trace messages.")
+
+ opts, mailboxes = parser.parse_args() # uses sys.argv[1:]
+
+ self.mailbox = None
+ if len(mailboxes) == 1:
+ self.mailbox = str(mailboxes[0])
+ self.server = opts.server
+ addr = opts.server.rsplit(":", 1)
+ self.host = addr[0]
+ if len(addr) == 2:
+ self.port = addr[1]
+ else:
+ self.port = "5672"
+ self.count = opts.count
+ self.verbose = opts.verbose
+
+
+class FetchClient(object):
+ def __init__(self, host, port, mailbox):
+ """ Initialize the client by supplying the address of the server, and
+ the name of the mailbox to fetch from.
+ """
+ self.host = host
+ self.port = port
+ self.mailbox = mailbox
+ self.logging = False
+
+ def setup(self):
+ """ Setup and configure the connection to the server.
+ """
+ # setup a driver connection to the server
+
+ self.log("Connecting to server host = %s:%s" % (self.host, self.port))
+ self.driver = pn_driver();
+ self.cxtr = pn_connector(self.driver, self.host, self.port, None)
+
+ # configure SASL
+ self.sasl = pn_connector_sasl(self.cxtr)
+ pn_sasl_mechanisms(self.sasl, "ANONYMOUS")
+ pn_sasl_client(self.sasl)
+
+ # inform the engine about the connection, and link the driver to it.
+ self.conn = pn_connection()
+ pn_connector_set_connection(self.cxtr, self.conn)
+
+ # create a session, and Link for receiving from the mailbox
+ self.log("Fetching from mailbox = %s" % self.mailbox)
+ self.ssn = pn_session(self.conn)
+ self.link = pn_receiver(self.ssn, "receiver")
+ pn_set_source(self.link, self.mailbox)
+
+ # now open all the engine endpoints
+ pn_connection_open(self.conn)
+ pn_session_open(self.ssn)
+ pn_link_open(self.link)
+
+
+ def wait(self):
+ """ Wait for an event to process.
+ """
+ self.log("Waiting for events...")
+
+ # prepare pending outbound data for the network
+ pn_connector_process(self.cxtr)
+
+ # wait forever for network event(s)
+ pn_driver_wait(self.driver, -1)
+
+ # process any data that arrived
+ pn_connector_process(self.cxtr)
+
+ self.log("...waiting done!")
+
+
+ def settle(self):
+ """ In order to be sure that the remote has seen that we accepted the
+ message, we need to wait until the message's delivery has been remotely
+ settled. Once that occurs, we can release the delivery by settling it.
+ """
+ # locally settle any remotely settled deliveries
+ d = pn_unsettled_head(self.link)
+ while d and not pn_readable(d): # can stop when we hit first
+ # delivery that has not yet been read
+ _next = pn_unsettled_next(d)
+ if pn_remote_settled(d): # remote has seen our ack (or doesn't
care)
+ pn_settle(d) # now free it
+ d = _next
+
+
+ def enableLogging(self):
+ self.logging = True
+
+
+ def log(self, msg):
+ if self.logging:
+ print("%s" % msg)
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+ options = Options()
+
+ if not options.mailbox:
+ print("No mailbox name given!")
+ return -1
+
+ receiver = FetchClient(options.host,
+ options.port,
+ options.mailbox)
+ if options.verbose:
+ receiver.enableLogging()
+
+ receiver.setup()
+
+ # wait until we authenticate with the server
+ while pn_sasl_state(receiver.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL):
+ receiver.wait()
+
+ if pn_sasl_state(receiver.sasl) == PN_SASL_FAIL:
+ print("Error: Authentication failure")
+ return -1
+
+ # wait until the server has opened the connection
+ while not (pn_link_state(receiver.link) & PN_REMOTE_ACTIVE):
+ receiver.wait()
+
+ # check if the server recognizes the mailbox, fail if it does not
+ if pn_remote_source(receiver.link) != options.mailbox:
+ print("Error: mailbox %s does not exist!" % options.mailbox)
+ return -2
+
+ # Allow the server to send "count" messages to the receiver by setting
+ # the credit to the expected count
+ pn_flow(receiver.link, options.count)
+
+ # main loop: continue fetching messages until 'count' messages have been
+ # retrieved
+
+ while pn_credit(receiver.link) > 0: # while all msgs have not arrived
+ if pn_queued(receiver.link) == 0: # wait for some to arrive
+ receiver.wait()
+
+ # read all queued deliveries
+ while pn_queued(receiver.link):
+ delivery = pn_current(receiver.link)
+ # read all bytes of message
+ rc, msg = pn_recv(receiver.link, pn_pending(delivery))
+ receiver.log("Received count/status=%d" % rc)
+ if rc < 0:
+ print("Error: Receive failed (%s), exiting..." % rc)
+ return -3
+ print("%s" % msg)
+ # let the server know we accept the message
+ pn_disposition(delivery, PN_ACCEPTED)
+ pn_advance(receiver.link) # next delivery
+
+ # settle any deliveries that the server has seen
+ receiver.settle()
+
+ # block until any leftover deliveries are settled
+ while pn_unsettled(receiver.link) > 0:
+ receiver.wait()
+ receiver.settle()
+
+ # we're done, close and wait for the remote to close also
+ pn_connection_close(receiver.conn)
+ while not (pn_connection_state(receiver.conn) & PN_REMOTE_CLOSED):
+ receiver.wait()
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+
+
+
+
+
Propchange: qpid/proton/trunk/examples/mailbox/fetch
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/trunk/examples/mailbox/post
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/post?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/post (added)
+++ qpid/proton/trunk/examples/mailbox/post Thu Jun 14 21:05:59 2012
@@ -0,0 +1,207 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+
+class Options(object):
+ def __init__(self):
+ parser = OptionParser(usage="usage: %prog [options] <message-string>
[<message-string> ...]")
+ parser.add_option("-s", "--server", action="store", type="string",
+ default="0.0.0.0:5672", metavar="<server-address>",
+ help="Address of the server with syntax: hostname |
ip-address [:<port>]")
+ parser.add_option("-m", "--mailbox", action="store", type="string",
+ default="mailbox", metavar="<mailbox-address>",
+ help="Name of the mailbox on the server.")
+ parser.add_option("-v", "--verbose", action="store_true",
+ help="Turn on extra trace messages.")
+
+ opts, self.messages = parser.parse_args() # uses sys.argv[1:]
+
+ self.server = opts.server
+ self.mailbox = opts.mailbox
+
+ addr = opts.server.rsplit(":", 1)
+ self.host = addr[0]
+ if len(addr) == 2:
+ self.port = addr[1]
+ else:
+ self.port = "5672"
+ self.verbose = opts.verbose
+
+
+class PostClient(object):
+ def __init__(self, host, port, mailbox):
+ """ Initialize the client by supplying the address of the server, and
+ the name of the mailbox to post to.
+ """
+ self.host = host
+ self.port = port
+ self.mailbox = mailbox
+ self.logging = False
+
+ def setup(self):
+ """ Setup and configure the connection to the server.
+ """
+ # setup a driver connection to the server
+
+ self.log("Connecting to server host = %s:%s" % (self.host, self.port))
+ self.driver = pn_driver();
+ self.cxtr = pn_connector(self.driver, self.host, self.port, None)
+
+ # configure SASL
+ self.sasl = pn_connector_sasl(self.cxtr)
+ pn_sasl_mechanisms(self.sasl, "ANONYMOUS")
+ pn_sasl_client(self.sasl)
+
+ # inform the engine about the connection, and link the driver to it.
+ self.conn = pn_connection()
+ pn_connector_set_connection(self.cxtr, self.conn)
+
+ # create a session, and Link for receiving from the mailbox
+ self.log("Posting to mailbox = %s" % self.mailbox)
+ self.ssn = pn_session(self.conn)
+ self.link = pn_sender(self.ssn, "sender")
+ pn_set_target(self.link, self.mailbox)
+
+ # now open all the engine endpoints
+ pn_connection_open(self.conn)
+ pn_session_open(self.ssn)
+ pn_link_open(self.link)
+
+
+ def wait(self):
+ """ Wait for an event to process.
+ """
+ self.log("Waiting for events...")
+
+ # prepare pending outbound data for the network
+ pn_connector_process(self.cxtr)
+
+ # wait forever for network event(s)
+ pn_driver_wait(self.driver, -1)
+
+ # process any data that arrived
+ pn_connector_process(self.cxtr)
+
+ self.log("...waiting done!")
+
+
+ def settle(self):
+ """ In order to be sure that the remote has accepted the message, we
+ need to wait until the message's delivery has been remotely settled.
+ Once that occurs, we can release the delivery by settling it.
+ """
+ d = pn_unsettled_head(self.link)
+ while d:
+ _next = pn_unsettled_next(d)
+ # if the remote has either settled this delivery OR set the
+ # disposition, we consider the message received.
+ disp = pn_remote_disp(d)
+ if disp and disp != PN_ACCEPTED:
+ print("Warning: message was not accepted by the remote!")
+ if disp or pn_remote_settled(d):
+ pn_settle(d)
+ d = _next
+
+
+ def enableLogging(self):
+ self.logging = True
+
+
+ def log(self, msg):
+ if self.logging:
+ print("%s" % msg)
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+ options = Options()
+
+ if len(options.messages) == 0:
+ print("No message data given!")
+ return -1
+
+ sender = PostClient(options.host,
+ options.port,
+ options.mailbox)
+ if options.verbose:
+ sender.enableLogging()
+
+ sender.setup()
+
+ # wait until we authenticate with the server
+ while pn_sasl_state(sender.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL):
+ sender.wait()
+
+ if pn_sasl_state(sender.sasl) == PN_SASL_FAIL:
+ print("Error: Authentication failure")
+ return -1
+
+ # main loop: send each message
+
+ pendingSends = list(options.messages)
+ while pendingSends:
+ # wait until the server grands us some send credit
+ if pn_credit(sender.link) == 0:
+ sender.log("wait for credit")
+ sender.wait()
+
+ while pn_credit(sender.link) > 0:
+ msg = pendingSends.pop(0)
+ sender.log("sending %s" % msg)
+ d = pn_delivery(sender.link, "post-delivery-%s" %
len(pendingSends))
+ rc = pn_send(sender.link, msg)
+ if (rc < 0):
+ print("Error: sending message: %s" % rc)
+ return -2
+ assert rc == len(msg)
+ pn_advance(sender.link) # deliver the message
+
+ # settle any deliveries that the server has accepted
+ sender.settle()
+
+ # done sending, now block until any pending deliveries are settled
+ while pn_unsettled(sender.link) > 0:
+ sender.wait()
+ sender.settle()
+
+ # we're done, close and wait for the remote to close also
+ pn_connection_close(sender.conn)
+ while not (pn_connection_state(sender.conn) & PN_REMOTE_CLOSED):
+ sender.wait()
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+
+
+
+
+
Propchange: qpid/proton/trunk/examples/mailbox/post
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/trunk/examples/mailbox/server
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/mailbox/server?rev=1350403&view=auto
==============================================================================
--- qpid/proton/trunk/examples/mailbox/server (added)
+++ qpid/proton/trunk/examples/mailbox/server Thu Jun 14 21:05:59 2012
@@ -0,0 +1,373 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from optparse import OptionParser
+
+from cproton import *
+
+
+FAILED = 0
+CONNECTION_UP = 1
+AUTHENTICATING = 2
+
+counter = 0
+mailboxes = {}
+
+
+class Options(object):
+ def __init__(self):
+ parser = OptionParser(usage="usage: %prog [options] <server-address>")
+ parser.add_option("-v", "--verbose",
+ action="store_false", dest="verbose", default=True,
+ help="print status messages to stdout")
+
+ opts, self.server = parser.parse_args() # uses sys.argv[1:]
+ self.verbose = opts.verbose
+
+ if self.server:
+ addr = self.server[0].rsplit(":", 1)
+ self.host = addr[0]
+ if len(addr) == 2:
+ self.port = addr[1]
+ else:
+ self.port = "5672"
+ else:
+ self.host = "0.0.0.0"
+ self.port = "5672"
+
+
+class MailboxServer(object):
+ def __init__(self, host, port):
+ """ Initialize the server to wait on the given address for inbound
+ connection requests.
+ """
+ self.host = host
+ self.port = port
+ self.mailboxes = {}
+ self.verbose = False
+ self.counter = 0
+
+
+ def setup(self):
+ """ Setup and configure the server
+ """
+ self.log("Server started, listening on %s:%s" % (self.host, self.port))
+ self.driver = pn_driver();
+ self.listener = pn_listener(self.driver, self.host, self.port, None)
+ if self.listener is None:
+ print("Error: could not listen on %s:%s" % (self.host, self.port))
+ return False
+ return True
+
+
+ def wait(self):
+ """ Wait for a network event from the driver
+ """
+ self.log("Driver sleep...")
+ pn_driver_wait(self.driver, -1)
+ self.log("...Driver wakeup.")
+
+
+ def acceptConnectionRequests(self):
+ """ Accept connection request coming from remote clients. Create a
+ connector to track each accepted network connection.
+ """
+ l = pn_driver_listener(self.driver)
+ while l:
+ self.log("Accepting Connection.")
+ cxtr = pn_listener_accept(l)
+ pn_connector_set_context(cxtr, AUTHENTICATING)
+ l = pn_driver_listener(self.driver)
+
+
+ def processConnections(self):
+ """ Check each connector for pending work.
+ """
+ cxtr = pn_driver_connector(self.driver)
+ while cxtr:
+ self.log("Process Connector")
+
+ # process any data coming from the network, this will update the
+ # engine's view of the state of the remote clients
+ pn_connector_process(cxtr)
+
+ state = pn_connector_context(cxtr)
+ if state == AUTHENTICATING:
+ # connection has not passed SASL authentication yet
+ self.authenticateConnector(cxtr)
+ elif state == CONNECTION_UP:
+ # active connection, service any engine events
+ self.serviceConnector(cxtr)
+ else:
+ print("Error: Unknown Connection state=%s" % state)
+
+ # now generate any outbound network data generated in reponse to
+ # any work done by the engine.
+ pn_connector_process(cxtr)
+
+ if pn_connector_closed(cxtr):
+ self.log("Closing connector")
+ pn_connector_destroy(cxtr)
+
+ cxtr = pn_driver_connector(self.driver)
+
+
+ def enableLogging(self):
+ self.logging = True
+
+
+ def log(self, msg):
+ if self.logging:
+ print("%s" % msg)
+
+
+ def authenticateConnector(self, cxtr):
+ """ Authenticate the remote client. Run the SASL algorithm until it
+ passes, fails, or needs more data from the remote.
+ """
+ self.log("Authenticating...")
+ sasl = pn_connector_sasl(cxtr)
+ state = pn_sasl_state(sasl)
+ while state == PN_SASL_CONF or state == PN_SASL_STEP:
+ if state == PN_SASL_CONF:
+ self.log("Authenticating-CONF...")
+ pn_sasl_mechanisms(sasl, "ANONYMOUS")
+ pn_sasl_server(sasl)
+ elif state == PN_SASL_STEP:
+ self.log("Authenticating-STEP...")
+ mech = pn_sasl_remote_mechanisms(sasl)
+ if mech == "ANONYMOUS":
+ pn_sasl_done(sasl, PN_SASL_OK)
+ else:
+ pn_sasl_done(sasl, PN_SASL_AUTH)
+ state = pn_sasl_state(sasl)
+
+ if state == PN_SASL_PASS:
+ pn_connector_set_connection(cxtr, pn_connection());
+ pn_connector_set_context(cxtr, CONNECTION_UP)
+ self.log("Authentication-PASSED")
+ elif state == PN_SASL_FAIL:
+ pn_connector_set_context(cxtr, FAILED)
+ self.log("Authentication-FAILED")
+ else:
+ self.log("Authentication-PENDING")
+
+
+ def setupLink(self, link):
+ """ Configure a link coming from a client.
+ """
+ r_tgt = pn_remote_target(link)
+ r_src = pn_remote_source(link)
+
+ if pn_is_sender(link):
+ self.log("Opening Link to read from mailbox: %s" % r_src)
+ if r_src not in self.mailboxes:
+ print("Error: mailbox %s does not exist!" % r_src)
+ r_src = None # indicate to remote the mailbox does not exist
+ else:
+ self.log("Opening Link to write to mailbox: %s" % r_tgt)
+ if r_tgt not in self.mailboxes:
+ self.mailboxes[r_tgt] = [] # create a new mailbox
+
+ pn_set_target(link, r_tgt)
+ pn_set_source(link, r_src)
+
+ if pn_is_sender(link):
+ # grant a delivery to the link - it will become "writable" when the
+ # driver can accept messages for the sender.
+ pn_delivery(link, "server-delivery-%d" % self.counter)
+ self.counter += 1
+ else:
+ # Grant enough credit to the receiver to allow one inbound message
+ pn_flow(link, 1)
+
+ pn_link_open(link)
+
+
+ def serviceConnector(self, cxtr):
+ """ Process any pending I/O events on the given connector once it has
been
+ authenticated.
+ """
+ self.log("I/O processing start.")
+
+ # get the engine's connection from the driver
+ conn = pn_connector_connection(cxtr)
+
+ ## Step 1: setup the engine's connection, and any sessions and links
+ ## that may be pending.
+
+ # initialize the connection if it's new
+ if pn_connection_state(conn) & PN_LOCAL_UNINIT:
+ self.log("Connection Opened.")
+ pn_connection_open(conn)
+
+ # open all pending sessions
+ ssn = pn_session_head(conn, PN_LOCAL_UNINIT)
+ while ssn:
+ pn_session_open(ssn)
+ self.log("Session Opened.")
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT)
+
+ # configure and open any pending links
+ link = pn_link_head(conn, PN_LOCAL_UNINIT);
+ while link:
+ self.setupLink(link)
+ link = pn_link_next(link, PN_LOCAL_UNINIT);
+
+ ## Step 2: Now drain all the pending deliveries from the connection's
+ ## work queue and process them
+
+ delivery = pn_work_head(conn)
+ while delivery:
+ self.log("Process delivery %s." % pn_delivery_tag(delivery))
+
+ if pn_readable(delivery): # inbound data available
+ self.processReceive(delivery)
+ elif pn_writable(delivery): # can send a message
+ self.sendMessage(delivery)
+
+ if pn_updated(delivery):
+ # check to see if the remote has accepted message we sent
+ self.log("remote disposition for %s: %s " %
(str(pn_delivery_tag(delivery)),
+
str(pn_remote_disp(delivery))))
+ if pn_remote_disp(delivery):
+ # once we know the remote has seen the message, we can
+ # release the delivery.
+ pn_settle(delivery)
+
+ delivery = pn_work_next(delivery)
+
+ ## Step 3: Clean up any links or sessions that have been closed by the
+ ## remote. If the connection has been closed remotely, clean that up
+ ## also.
+
+ # teardown any terminating links
+ link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+ while link:
+ pn_link_close(link)
+ self.log("Link Closed")
+ link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+
+ # teardown any terminating sessions
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+ while ssn:
+ pn_session_close(ssn)
+ self.log("Session Closed")
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)
+
+ # teardown the connection if it's terminating
+ if pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED):
+ self.log("Connection Closed")
+ pn_connection_close(conn)
+
+
+ def processReceive(self, delivery):
+ """ A delivery has indicated that message data is available. Read the
+ message data, process it, then accept the message.
+ """
+ link = pn_link(delivery)
+ mbox = pn_remote_target(link)
+ rc, msg = pn_recv(link, 1024);
+ self.log("Msg Received %d" % rc);
+ while rc >= 0:
+ if mbox not in self.mailboxes:
+ print("Error: cannot sent to mailbox %s - dropping message." %
mbox)
+ else:
+ self.mailboxes[mbox].append(msg)
+ self.log("Mailbox %s contains: %s" % (mbox,
str(self.mailboxes[mbox])))
+ rc, msg = pn_recv(link, 1024);
+
+ # now that we hit the end of the message, update the
+ # disposition to let the remote know we accepted it.
+ self.log("Msg Accepted.");
+ pn_disposition(delivery, PN_ACCEPTED)
+
+ # since we no longer have any more work to do on this delivery, finish
+ # it and move to the next.
+ pn_settle(delivery)
+ pn_advance(link)
+
+ # if more credit is needed, grant it
+ if pn_credit(link) == 0:
+ pn_flow(link, 1)
+
+
+ def sendMessage(self, delivery):
+ """ A delivery has indicated that the link is able to accept a
+ message. Send a message over the link, but do not settle it until the
+ remote accepts it (and updates the delivery's disposition).
+ """
+ link = pn_link(delivery)
+ mbox = pn_remote_source(link)
+ self.log("Request for Mailbox=%s" % str(mbox))
+ if mbox in self.mailboxes and self.mailboxes[mbox]:
+ msg = self.mailboxes[mbox].pop(0)
+ self.log("Fetching message %s" % str(msg))
+ else:
+ print("Warning: mailbox %s is empty, sending empty message." %
mbox)
+ msg = ""
+ sent = pn_send(link, msg)
+ assert(sent == len(msg))
+ self.log("Msg Sent %d" % sent);
+
+ # if the link can accept more ???RAFI, is that correct????, grant
+ # another delivery
+ if pn_advance(link):
+ pn_delivery(link, "server-delivery-%d" % self.counter)
+ self.counter += 1
+
+ # do not settle the delivery now - wait until the remote sets the
disposition.
+
+
+##################################################
+##################################################
+##################################################
+
+
+def main():
+ options = Options()
+
+ server = MailboxServer(options.host, options.port)
+ if (options.verbose):
+ server.enableLogging()
+
+ if not server.setup():
+ return -1;
+
+ while True:
+ # wait for a driver event
+ server.wait()
+
+ # accept all pending connection requests
+ server.acceptConnectionRequests()
+
+ # process all connectors with pending events
+ server.processConnections()
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+
+
+
+
+
Propchange: qpid/proton/trunk/examples/mailbox/server
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]