Added ssl support
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/dcec9ff0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/dcec9ff0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/dcec9ff0 Branch: refs/heads/master Commit: dcec9ff064cc7cae88b6a44e497c04c9cbeaece7 Parents: 93095ed Author: Gordon Sim <[email protected]> Authored: Wed Jan 28 21:53:17 2015 +0000 Committer: Gordon Sim <[email protected]> Committed: Thu Jan 29 19:42:22 2015 +0000 ---------------------------------------------------------------------- proton-c/bindings/python/proton/reactors.py | 72 +++++++++++++++++------- proton-c/bindings/python/proton/utils.py | 4 +- 2 files changed, 55 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index c5eddd9..463bf2d 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -20,7 +20,7 @@ import logging, os, Queue, socket, time, types from heapq import heappush, heappop, nsmallest from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message -from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol +from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, symbol from proton import Terminus, Timeout, Transport, TransportException, ulong, Url from select import select from proton.handlers import OutgoingMessageHandler, ScopedHandler @@ -45,7 +45,9 @@ class AmqpSocket(object): self.read_done = False self._closed = False - def accept(self, force_sasl=True): + def accept(self, force_sasl=True, ssl_domain=None): + if ssl_domain: + self.ssl = SSL(self.transport, ssl_domain) if force_sasl: sasl = self.transport.sasl() sasl.mechanisms("ANONYMOUS") @@ -54,7 +56,10 @@ class AmqpSocket(object): #TODO: use SASL anyway if requested by peer return self - def connect(self, host, port=None, username=None, password=None, force_sasl=True): + def connect(self, host, port=None, username=None, password=None, force_sasl=True, ssl_domain=None): + if ssl_domain: + self.ssl = SSL(self.transport, ssl_domain) + self.ssl.peer_hostname = host if username and password: sasl = self.transport.sasl() sasl.plain(username, password) @@ -165,7 +170,7 @@ class AmqpAcceptor: itself be added to an io loop. """ - def __init__(self, events, loop, host, port): + def __init__(self, events, loop, host, port, ssl_domain=None): self.events = events self.loop = loop self.socket = socket.socket() @@ -173,6 +178,7 @@ class AmqpAcceptor: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((host, port)) self.socket.listen(5) + self.ssl_domain = ssl_domain self.loop.add(self) self._closed = False @@ -198,7 +204,7 @@ class AmqpAcceptor: def readable(self): sock, addr = self.socket.accept() if sock: - self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept()) + self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept(ssl_domain=self.ssl_domain)) def removed(self): pass def tick(self): return None @@ -634,16 +640,27 @@ class Connector(Handler): Internal handler that triggers the necessary socket connect for an opened connection. """ - def __init__(self, loop): + def __init__(self, loop, ssl_domain=None): self.loop = loop + self.ssl_domain = ssl_domain + + def _get_ssl_domain(self, connection, scheme): + if hasattr(connection, 'ssl_domain'): + return connection.ssl_domain + elif scheme == 'amqps': + return self.ssl_domain + else: + return None def _connect(self, connection): - host, port = connection.address.next() - logging.info("connecting to %s:%i" % (host, port)) + url = connection.address.next() + logging.info("connecting to %s:%i" % (url.host, url.port)) heartbeat = None if hasattr(connection, 'heartbeat'): heartbeat = connection.heartbeat - self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port)) + s = AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat) + s.connect(url.host, url.port, username=url.username, password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme)) + self.loop.add(s) connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference def on_connection_local_open(self, event): @@ -698,26 +715,38 @@ class Urls(object): def __iter__(self): return self - def _as_pair(self, url): - return (url.host, url.port) - def next(self): try: - return self._as_pair(self.i.next()) + return self.i.next() except StopIteration: self.i = iter(self.values) - return self._as_pair(self.i.next()) + return self.i.next() + +class SSLConfig(object): + def __init__(self): + self.client = SSLDomain(SSLDomain.MODE_CLIENT) + self.server = SSLDomain(SSLDomain.MODE_SERVER) + + def set_credentials(self, cert_file, key_file, password): + self.client.set_credentials(cert_file, key_file, password) + self.server.set_credentials(cert_file, key_file, password) + + def set_trusted_ca_db(self, certificate_db): + self.client.set_trusted_ca_db(certificate_db) + self.server.set_trusted_ca_db(certificate_db) + class Container(object): def __init__(self, *handlers): - h = [Connector(self), ScopedHandler()] + self.ssl = SSLConfig() + h = [Connector(self, self.ssl.client), ScopedHandler()] h.extend(handlers) self.events = Events(*h) self.loop = SelectLoop(self.events) self.trigger = None self.container_id = str(generate_uuid()) - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None): + def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None): conn = self.events.connection() conn._pin = conn #circular reference until the open event gets handled if handler: @@ -732,6 +761,8 @@ class Container(object): conn.reconnect = reconnect elif reconnect is None: conn.reconnect = Backoff() + if ssl_domain: + conn.ssl_domain = ssl_domain conn._session_policy = SessionPerConnection() #todo: make configurable conn.open() return conn @@ -800,9 +831,12 @@ class Container(object): context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) return Transaction(context._txn_ctrl, handler, settle_before_discharge) - def listen(self, url): - host, port = Urls([url]).next() - return AmqpAcceptor(self.events, self, host, port) + def listen(self, url, ssl_domain=None): + url = Urls([url]).next() + ssl_config = ssl_domain + if not ssl_config and url.scheme == 'amqps': + ssl_config = self.ssl_domain + return AmqpAcceptor(self.events, self, url.host, url.port, ssl_domain=ssl_config) def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index 4c9d509..d2ece79 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -139,11 +139,11 @@ class BlockingConnection(Handler): """ A synchronous style connection wrapper. """ - def __init__(self, url, timeout=None, container=None): + def __init__(self, url, timeout=None, container=None, ssl_domain=None): self.timeout = timeout self.container = container or Container() self.url = Url(utf8(url)).defaults() - self.conn = self.container.connect(url=self.url, handler=self) + self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain) self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), msg="Opening connection") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
