Hello community, here is the log from the commit of package python-kombu for openSUSE:Factory checked in at 2014-01-15 16:26:53 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-kombu (Old) and /work/SRC/openSUSE:Factory/.python-kombu.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-kombu" Changes: -------- --- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes 2013-12-10 20:06:32.000000000 +0100 +++ /work/SRC/openSUSE:Factory/.python-kombu.new/python-kombu.changes 2014-01-15 16:26:54.000000000 +0100 @@ -1,0 +2,15 @@ +Mon Jan 13 13:36:27 UTC 2014 - [email protected] + +- update to 3.0.8: + - Redis: Would attempt to read from the wrong connection if a select/epoll/kqueue + exception event happened. + - Redis: Disabling ack emulation now works properly. + - Redis: :exc:`IOError` and :exc:`OSError` are now treated as recoverable + connection errors. + - SQS: Improved performance by reading messages in bulk. + - Connection Pool: Attempting to acquire from a closed pool will now +- Changes from 3.0.7: + - Fixes Python 2.6 compatibility. + - Redis: Fixes 'bad file descriptor' issue. + +------------------------------------------------------------------- Old: ---- kombu-3.0.6.tar.gz New: ---- kombu-3.0.8.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-kombu.spec ++++++ --- /var/tmp/diff_new_pack.0nMgqS/_old 2014-01-15 16:26:54.000000000 +0100 +++ /var/tmp/diff_new_pack.0nMgqS/_new 2014-01-15 16:26:54.000000000 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-kombu # -# Copyright (c) 2013 SUSE LINUX Products GmbH, Nuernberg, Germany. +# Copyright (c) 2014 SUSE LINUX Products GmbH, Nuernberg, Germany. # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -17,7 +17,7 @@ Name: python-kombu -Version: 3.0.6 +Version: 3.0.8 Release: 0 Summary: AMQP Messaging Framework for Python License: BSD-3-Clause ++++++ kombu-3.0.6.tar.gz -> kombu-3.0.8.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/AUTHORS new/kombu-3.0.8/AUTHORS --- old/kombu-3.0.6/AUTHORS 2013-11-20 17:38:01.000000000 +0100 +++ new/kombu-3.0.8/AUTHORS 2013-12-13 17:25:22.000000000 +0100 @@ -59,9 +59,11 @@ Mahendra M <[email protected]> Marcin Lulek (ergo) <[email protected]> Mark Lavin <[email protected]> +Matt Wise <[email protected]> Maxime Rouyrre <[email protected]> Mher Movsisyan <[email protected]> Michael Barrett <[email protected]> +Michael Nelson <[email protected]> Nitzan Miron <[email protected]> Noah Kantrowitz <[email protected]> Ollie Walsh <[email protected]> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/Changelog new/kombu-3.0.8/Changelog --- old/kombu-3.0.6/Changelog 2013-11-21 17:47:12.000000000 +0100 +++ new/kombu-3.0.8/Changelog 2013-12-16 18:03:33.000000000 +0100 @@ -4,6 +4,49 @@ Change history ================ +.. _version-3.0.8: + +3.0.8 +===== +:release-date: 2013-12-16 17:00 P.M UTC +:release-by: Ask Solem + +- Serializer: loads and dumps now wraps exceptions raised into + :exc:`~kombu.exceptions.DecodeError` and + :exc:`kombu.exceptions.EncodeError` respectively. + + Contributed by Ionel Cristian Maries + +- Redis: Would attempt to read from the wrong connection if a select/epoll/kqueue + exception event happened. + + Fix contributed by Michael Nelson. + +- Redis: Disabling ack emulation now works properly. + + Fix contributed by Michael Nelson. + +- Redis: :exc:`IOError` and :exc:`OSError` are now treated as recoverable + connection errors. + +- SQS: Improved performance by reading messages in bulk. + + Contributed by Matt Wise. + +- Connection Pool: Attempting to acquire from a closed pool will now + raise :class:`RuntimeError`. + +.. _version-3.0.7: + +3.0.7 +===== +:release-date: 2013-12-02 04:00 P.M UTC +:release-by: Ask Solem + +- Fixes Python 2.6 compatibility. + +- Redis: Fixes 'bad file descriptor' issue. + .. _version-3.0.6: 3.0.6 @@ -37,7 +80,7 @@ :release-date: 2013-11-15 11:00 P.M UTC :release-by: Ask Solem -- Now depends on :mod:`amqp` 3.0.3. +- Now depends on :mod:`amqp` 1.3.3. - Redis: Fixed Python 3 compatibility problem (Issue #270). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/PKG-INFO new/kombu-3.0.8/PKG-INFO --- old/kombu-3.0.6/PKG-INFO 2013-11-21 17:49:02.000000000 +0100 +++ new/kombu-3.0.8/PKG-INFO 2013-12-16 18:35:35.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: kombu -Version: 3.0.6 +Version: 3.0.8 Summary: Messaging library for Python Home-page: http://kombu.readthedocs.org Author: Ask Solem @@ -12,7 +12,7 @@ kombu - Messaging library for Python ======================================== - :Version: 3.0.6 + :Version: 3.0.8 `Kombu` is a messaging library for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/README.rst new/kombu-3.0.8/README.rst --- old/kombu-3.0.6/README.rst 2013-11-21 17:48:15.000000000 +0100 +++ new/kombu-3.0.8/README.rst 2013-12-16 17:12:00.000000000 +0100 @@ -4,7 +4,7 @@ kombu - Messaging library for Python ======================================== -:Version: 3.0.6 +:Version: 3.0.8 `Kombu` is a messaging library for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/docs/changelog.rst new/kombu-3.0.8/docs/changelog.rst --- old/kombu-3.0.6/docs/changelog.rst 2013-11-21 17:47:12.000000000 +0100 +++ new/kombu-3.0.8/docs/changelog.rst 2013-12-16 18:03:33.000000000 +0100 @@ -4,6 +4,49 @@ Change history ================ +.. _version-3.0.8: + +3.0.8 +===== +:release-date: 2013-12-16 17:00 P.M UTC +:release-by: Ask Solem + +- Serializer: loads and dumps now wraps exceptions raised into + :exc:`~kombu.exceptions.DecodeError` and + :exc:`kombu.exceptions.EncodeError` respectively. + + Contributed by Ionel Cristian Maries + +- Redis: Would attempt to read from the wrong connection if a select/epoll/kqueue + exception event happened. + + Fix contributed by Michael Nelson. + +- Redis: Disabling ack emulation now works properly. + + Fix contributed by Michael Nelson. + +- Redis: :exc:`IOError` and :exc:`OSError` are now treated as recoverable + connection errors. + +- SQS: Improved performance by reading messages in bulk. + + Contributed by Matt Wise. + +- Connection Pool: Attempting to acquire from a closed pool will now + raise :class:`RuntimeError`. + +.. _version-3.0.7: + +3.0.7 +===== +:release-date: 2013-12-02 04:00 P.M UTC +:release-by: Ask Solem + +- Fixes Python 2.6 compatibility. + +- Redis: Fixes 'bad file descriptor' issue. + .. _version-3.0.6: 3.0.6 @@ -37,7 +80,7 @@ :release-date: 2013-11-15 11:00 P.M UTC :release-by: Ask Solem -- Now depends on :mod:`amqp` 3.0.3. +- Now depends on :mod:`amqp` 1.3.3. - Redis: Fixed Python 3 compatibility problem (Issue #270). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/docs/introduction.rst new/kombu-3.0.8/docs/introduction.rst --- old/kombu-3.0.6/docs/introduction.rst 2013-11-21 17:48:15.000000000 +0100 +++ new/kombu-3.0.8/docs/introduction.rst 2013-12-16 17:12:00.000000000 +0100 @@ -4,7 +4,7 @@ kombu - Messaging library for Python ======================================== -:Version: 3.0.6 +:Version: 3.0.8 `Kombu` is a messaging library for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/examples/complete_receive.py new/kombu-3.0.8/examples/complete_receive.py --- old/kombu-3.0.6/examples/complete_receive.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/examples/complete_receive.py 2013-12-13 15:41:17.000000000 +0100 @@ -7,6 +7,8 @@ #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') def pretty(obj): @@ -24,16 +26,7 @@ #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -with Connection('pyamqp://guest:guest@localhost:5672//') as connection: - # The configuration of the message flow is as follows: - # gateway_kombu_exchange -> internal_kombu_exchange -> kombu_demo queue - gateway_exchange = Exchange('gateway_kombu_demo')(connection) - exchange = Exchange('internal_kombu_demo')(connection) - gateway_exchange.declare() - exchange.declare() - exchange.bind_to(gateway_exchange, routing_key='kombu_demo') - - queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') +with Connection('amqp://guest:guest@localhost:5672//') as connection: #: Create consumer using our callback and queue. #: Second argument can also be a list to consume from diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/__init__.py new/kombu-3.0.8/kombu/__init__.py --- old/kombu-3.0.6/kombu/__init__.py 2013-11-21 17:48:12.000000000 +0100 +++ new/kombu-3.0.8/kombu/__init__.py 2013-12-16 18:03:28.000000000 +0100 @@ -7,7 +7,7 @@ 'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'), ) -VERSION = version_info_t(3, 0, 6, '', '') +VERSION = version_info_t(3, 0, 8, '', '') __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION) __author__ = 'Ask Solem' __contact__ = '[email protected]' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/clocks.py new/kombu-3.0.8/kombu/clocks.py --- old/kombu-3.0.6/kombu/clocks.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/clocks.py 2013-12-13 15:41:17.000000000 +0100 @@ -107,8 +107,8 @@ def adjust(self, other): with self.mutex: - self.value = max(self.value, other) + 1 - return self.value + value = self.value = max(self.value, other) + 1 + return value def forward(self): with self.mutex: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/connection.py new/kombu-3.0.8/kombu/connection.py --- old/kombu-3.0.6/kombu/connection.py 2013-11-21 15:20:27.000000000 +0100 +++ new/kombu-3.0.8/kombu/connection.py 2013-12-13 15:41:17.000000000 +0100 @@ -10,7 +10,6 @@ import os import socket -from collections import Callable from contextlib import contextmanager from functools import partial from itertools import count, cycle @@ -837,6 +836,7 @@ def __init__(self, limit=None, preload=None): self.limit = limit self.preload = preload or 0 + self._closed = False self._resource = _LifoQueue() self._dirty = set() @@ -865,6 +865,8 @@ and the limit has been exceeded. """ + if self._closed: + raise RuntimeError('Acquire on closed pool') if self.limit: while 1: try: @@ -934,6 +936,7 @@ after fork (e.g. sockets/connections). """ + self._closed = True dirty = self._dirty resource = self._resource while 1: # - acquired @@ -1024,7 +1027,7 @@ self._resource.put_nowait(conn) def prepare(self, resource): - if isinstance(resource, Callable): + if callable(resource): resource = resource() resource._debug('acquired') return resource @@ -1049,7 +1052,7 @@ i < self.preload and channel() or lazy(channel)) def prepare(self, channel): - if isinstance(channel, Callable): + if callable(channel): channel = channel() return channel diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/entity.py new/kombu-3.0.8/kombu/entity.py --- old/kombu-3.0.6/kombu/entity.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/entity.py 2013-12-13 15:41:17.000000000 +0100 @@ -661,11 +661,11 @@ def __repr__(self): s = super(Queue, self).__repr__ if self.bindings: - return s('Queue {0.name!r} -> {bindings}'.format( + return s('Queue {0.name} -> {bindings}'.format( self, bindings=pretty_bindings(self.bindings), )) return s( - 'Queue {0.name!r} -> {0.exchange!r} -> {0.routing_key}'.format( + 'Queue {0.name} -> {0.exchange!r} -> {0.routing_key}'.format( self)) @property diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/exceptions.py new/kombu-3.0.8/kombu/exceptions.py --- old/kombu-3.0.6/kombu/exceptions.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/exceptions.py 2013-12-16 17:42:05.000000000 +0100 @@ -22,6 +22,20 @@ class KombuError(Exception): """Common subclass for all Kombu exceptions.""" + pass + + +class SerializationError(KombuError): + """Failed to serialize/deserialize content.""" + + +class EncodeError(SerializationError): + """Cannot encode object.""" + pass + + +class DecodeError(SerializationError): + """Cannot decode object.""" class NotBoundError(KombuError): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/pools.py new/kombu-3.0.8/kombu/pools.py --- old/kombu-3.0.6/kombu/pools.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/pools.py 2013-12-13 15:41:17.000000000 +0100 @@ -9,7 +9,6 @@ import os -from collections import Callable from itertools import chain from .connection import Resource @@ -58,7 +57,7 @@ pass def prepare(self, p): - if isinstance(p, Callable): + if callable(p): p = p() if p._channel is None: conn = self._acquire_connection() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/serialization.py new/kombu-3.0.8/kombu/serialization.py --- old/kombu-3.0.6/kombu/serialization.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/serialization.py 2013-12-16 17:49:57.000000000 +0100 @@ -18,9 +18,12 @@ cpickle = None # noqa from collections import namedtuple +from contextlib import contextmanager -from .exceptions import SerializerNotInstalled, ContentDisallowed -from .five import BytesIO, text_t +from .exceptions import ( + ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled +) +from .five import BytesIO, reraise, text_t from .utils import entrypoints from .utils.encoding import str_to_bytes, bytes_t @@ -44,6 +47,17 @@ codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder')) +@contextmanager +def _reraise_errors(wrapper, + include=(Exception, ), exclude=(SerializerNotInstalled, )): + try: + yield + except exclude: + raise + except include as exc: + reraise(wrapper, wrapper(exc), sys.exc_info()[2]) + + def pickle_loads(s, load=pickle_load): # used to support buffer objects return load(BytesIO(s)) @@ -133,7 +147,8 @@ # For Unicode objects, force it into a string if not serializer and isinstance(data, text_t): - payload = data.encode('utf-8') + with _reraise_errors(EncodeError, exclude=()): + payload = data.encode('utf-8') return 'text/plain', 'utf-8', payload if serializer: @@ -144,7 +159,8 @@ content_type = self._default_content_type content_encoding = self._default_content_encoding - payload = encoder(data) + with _reraise_errors(EncodeError): + payload = encoder(data) return content_type, content_encoding, payload encode = dumps # XXX compat @@ -162,10 +178,12 @@ if data: decode = self._decoders.get(content_type) if decode: - return decode(data) + with _reraise_errors(DecodeError): + return decode(data) if content_encoding not in SKIP_DECODE and \ not isinstance(data, text_t): - return _decode(data, content_encoding) + with _reraise_errors(DecodeError): + return _decode(data, content_encoding) return data decode = loads # XXX compat @@ -278,7 +296,8 @@ payload = data if isinstance(payload, text_t): content_encoding = 'utf-8' - payload = payload.encode(content_encoding) + with _reraise_errors(EncodeError, exclude=()): + payload = payload.encode(content_encoding) else: content_encoding = 'binary' return content_type, content_encoding, payload diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/tests/test_serialization.py new/kombu-3.0.8/kombu/tests/test_serialization.py --- old/kombu-3.0.6/kombu/tests/test_serialization.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/tests/test_serialization.py 2013-12-16 17:51:25.000000000 +0100 @@ -7,7 +7,7 @@ from base64 import b64decode -from kombu.exceptions import ContentDisallowed +from kombu.exceptions import ContentDisallowed, EncodeError, DecodeError from kombu.five import text_t, bytes_t from kombu.serialization import ( registry, register, SerializerNotInstalled, @@ -186,6 +186,15 @@ call('pickle'), call('yaml'), call('doomsday') ]) + def test_reraises_EncodeError(self): + with self.assertRaises(EncodeError): + dumps([object()], serializer='json') + + def test_reraises_DecodeError(self): + with self.assertRaises(DecodeError): + loads(object(), content_type='application/json', + content_encoding='utf-8') + def test_json_loads(self): self.assertEqual( py_data, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/tests/transport/test_SQS.py new/kombu-3.0.8/kombu/tests/transport/test_SQS.py --- old/kombu-3.0.6/kombu/tests/transport/test_SQS.py 1970-01-01 01:00:00.000000000 +0100 +++ new/kombu-3.0.8/kombu/tests/transport/test_SQS.py 2013-12-13 17:25:22.000000000 +0100 @@ -0,0 +1,293 @@ +"""Testing module for the kombu.transport.SQS package. + +NOTE: The SQSQueueMock and SQSConnectionMock classes originally come from +http://github.com/pcsforeducation/sqs-mock-python. They have been patched +slightly. +""" + +from __future__ import absolute_import + +from kombu import Connection +from kombu import messaging +from kombu import five +from kombu.tests.case import Case, SkipTest +import kombu + +try: + from kombu.transport import SQS +except ImportError: + # Boto must not be installed if the SQS transport fails to import, + # so we skip all unit tests. Set SQS to None here, and it will be + # checked during the setUp() phase later. + SQS = None + + +class SQSQueueMock(object): + + def __init__(self, name): + self.name = name + self.messages = [] + self._get_message_calls = 0 + + def clear(self, page_size=10, vtimeout=10): + empty, self.messages[:] = not self.messages, [] + return not empty + + def count(self, page_size=10, vtimeout=10): + return len(self.messages) + count_slow = count + + def delete(self): + self.messages[:] = [] + return True + + def delete_message(self, message): + try: + self.messages.remove(message) + except ValueError: + return False + return True + + def get_messages(self, num_messages=1, visibility_timeout=None, + attributes=None, *args, **kwargs): + self._get_message_calls += 1 + return self.messages[:num_messages] + + def read(self, visibility_timeout=None): + return self.messages.pop(0) + + def write(self, message): + self.messages.append(message) + return True + + +class SQSConnectionMock(object): + + def __init__(self): + self.queues = {} + + def get_queue(self, queue): + return self.queues.get(queue) + + def get_all_queues(self, prefix=""): + return self.queues.values() + + def delete_queue(self, queue, force_deletion=False): + q = self.get_queue(queue) + if q: + if q.count(): + return False + q.clear() + self.queues.pop(queue, None) + + def delete_message(self, queue, message): + return queue.delete_message(message) + + def create_queue(self, name, *args, **kwargs): + q = self.queues[name] = SQSQueueMock(name) + return q + + +class test_Channel(Case): + + def handleMessageCallback(self, message): + self.callback_message = message + + def setUp(self): + """Mock the back-end SQS classes""" + # Sanity check... if SQS is None, then it did not import and we + # cannot execute our tests. + if SQS is None: + raise SkipTest('Boto is not installed') + + SQS.Channel._queue_cache.clear() + + # Common variables used in the unit tests + self.queue_name = 'unittest' + + # Mock the sqs() method that returns an SQSConnection object and + # instead return an SQSConnectionMock() object. + self.sqs_conn_mock = SQSConnectionMock() + + def mock_sqs(): + return self.sqs_conn_mock + SQS.Channel.sqs = mock_sqs() + + # Set up a task exchange for passing tasks through the queue + self.exchange = kombu.Exchange('test_SQS', type='direct') + self.queue = kombu.Queue(self.queue_name, + self.exchange, + self.queue_name) + + # Mock up a test SQS Queue with the SQSQueueMock class (and always + # make sure its a clean empty queue) + self.sqs_queue_mock = SQSQueueMock(self.queue_name) + + # Now, create our Connection object with the SQS Transport and store + # the connection/channel objects as references for use in these tests. + self.connection = Connection(transport=SQS.Transport) + self.channel = self.connection.channel() + + self.queue(self.channel).declare() + self.producer = messaging.Producer(self.channel, + self.exchange, + routing_key=self.queue_name) + + # Lastly, make sure that we're set up to 'consume' this queue. + self.channel.basic_consume(self.queue_name, + no_ack=True, + callback=self.handleMessageCallback, + consumer_tag='unittest') + + def test_init(self): + """kombu.SQS.Channel instantiates correctly with mocked queues""" + self.assertIn(self.queue_name, self.channel._queue_cache) + + def test_new_queue(self): + queue_name = "new_unittest_queue" + self.channel._new_queue(queue_name) + self.assertIn(queue_name, self.sqs_conn_mock.queues) + # For cleanup purposes, delete the queue and the queue file + self.channel._delete(queue_name) + + def test_delete(self): + queue_name = "new_unittest_queue" + self.channel._new_queue(queue_name) + self.channel._delete(queue_name) + self.assertNotIn(queue_name, self.channel._queue_cache) + + def test_get_from_sqs(self): + # Test getting a single message + message = "my test message" + self.producer.publish(message) + results = self.channel._get_from_sqs(self.queue_name) + self.assertEquals(len(results), 1) + + # Now test getting many messages + for i in xrange(3): + message = "message: %s" % i + self.producer.publish(message) + + results = self.channel._get_from_sqs(self.queue_name, count=3) + self.assertEquals(len(results), 3) + + def test_get_with_empty_list(self): + self.assertRaises(five.Empty, self.channel._get, self.queue_name) + + def test_get_bulk_raises_empty(self): + self.assertRaises(five.Empty, self.channel._get_bulk, self.queue_name) + + def test_messages_to_python(self): + message_count = 3 + # Create several test messages and publish them + for i in range(message_count): + message = 'message: %s' % i + self.producer.publish(message) + + # Get the messages now + messages = self.channel._get_from_sqs( + self.queue_name, count=message_count, + ) + + # Now convert them to payloads + payloads = self.channel._messages_to_python( + messages, self.queue_name, + ) + + # We got the same number of payloads back, right? + self.assertEquals(len(payloads), message_count) + + # Make sure they're payload-style objects + for p in payloads: + self.assertTrue('properties' in p) + + def test_put_and_get(self): + message = 'my test message' + self.producer.publish(message) + results = self.queue(self.channel).get().payload + self.assertEquals(message, results) + + def test_puts_and_gets(self): + for i in xrange(3): + message = 'message: %s' % i + self.producer.publish(message) + + for i in xrange(3): + self.assertEquals('message: %s' % i, + self.queue(self.channel).get().payload) + + def test_put_and_get_bulk(self): + # With QoS.prefetch_count = 0 + message = 'my test message' + self.producer.publish(message) + results = self.channel._get_bulk(self.queue_name) + self.assertEquals(1, len(results)) + + def test_puts_and_get_bulk(self): + # Generate 8 messages + message_count = 8 + + # Set the prefetch_count to 5 + self.channel.qos.prefetch_count = 5 + + # Now, generate all the messages + for i in xrange(message_count): + message = 'message: %s' % i + self.producer.publish(message) + + # Count how many messages are retrieved the first time. Should + # be 5 (message_count). + results = self.channel._get_bulk(self.queue_name) + self.assertEquals(5, len(results)) + + # Now, do the get again, the number of messages returned should be 3. + results = self.channel._get_bulk(self.queue_name) + self.assertEquals(3, len(results)) + + def test_drain_events_with_empty_list(self): + def mock_can_consume(): + return False + self.channel.qos.can_consume = mock_can_consume + self.assertRaises(five.Empty, self.channel.drain_events) + + def test_drain_events_with_prefetch_5(self): + # Generate 20 messages + message_count = 20 + expected_get_message_count = 4 + + # Set the prefetch_count to 5 + self.channel.qos.prefetch_count = 5 + + # Now, generate all the messages + for i in xrange(message_count): + self.producer.publish('message: %s' % i) + + # Now drain all the events + for i in xrange(message_count): + self.channel.drain_events() + + # How many times was the SQSConnectionMock get_message method called? + self.assertEquals( + expected_get_message_count, + self.channel._queue_cache[self.queue_name]._get_message_calls) + + def test_drain_events_with_prefetch_none(self): + # Generate 20 messages + message_count = 20 + expected_get_message_count = 20 + + # Set the prefetch_count to None + self.channel.qos.prefetch_count = None + + # Now, generate all the messages + for i in xrange(message_count): + self.producer.publish('message: %s' % i) + + # Now drain all the events + for i in xrange(message_count): + self.channel.drain_events() + + # How many times was the SQSConnectionMock get_message method called? + self.assertEquals( + expected_get_message_count, + self.channel._queue_cache[self.queue_name]._get_message_calls) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/tests/transport/test_redis.py new/kombu-3.0.8/kombu/tests/transport/test_redis.py --- old/kombu-3.0.6/kombu/tests/transport/test_redis.py 2013-11-21 16:31:10.000000000 +0100 +++ new/kombu-3.0.8/kombu/tests/transport/test_redis.py 2013-12-13 17:25:26.000000000 +0100 @@ -496,10 +496,22 @@ c.parse_response = Mock() self.channel._poll_error('BRPOP') - c.parse_response.assert_called_with('BRPOP') + c.parse_response.assert_called_with(c.connection, 'BRPOP') c.parse_response.side_effect = KeyError('foo') - self.assertIsNone(self.channel._poll_error('BRPOP')) + with self.assertRaises(KeyError): + self.channel._poll_error('BRPOP') + + def test_poll_error_on_type_LISTEN(self): + c = self.channel.subclient = Mock() + c.parse_response = Mock() + self.channel._poll_error('LISTEN') + + c.parse_response.assert_called_with() + + c.parse_response.side_effect = KeyError('foo') + with self.assertRaises(KeyError): + self.channel._poll_error('LISTEN') def test_put_fanout(self): self.channel._in_poll = False diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/transport/SQS.py new/kombu-3.0.8/kombu/transport/SQS.py --- old/kombu-3.0.6/kombu/transport/SQS.py 2013-11-15 13:50:31.000000000 +0100 +++ new/kombu-3.0.8/kombu/transport/SQS.py 2013-12-16 17:12:00.000000000 +0100 @@ -2,11 +2,44 @@ kombu.transport.SQS =================== -Amazon SQS transport. - +Amazon SQS transport module for Kombu. This package implements an AMQP-like +interface on top of Amazons SQS service, with the goal of being optimized for +high performance and reliability. + +The default settings for this module are focused now on high performance in +task queue situations where tasks are small, idempotent and run very fast. + +SQS Features supported by this transport: + Long Polling: + http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ + sqs-long-polling.html + + Long polling is enabled by setting the `wait_time_seconds` transport + option to a number > 1. Amazon supports up to 20 seconds. This is + disabled for now, but will be enabled by default in the near future. + + Batch API Actions: + http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ + sqs-batch-api.html + + The default behavior of the SQS Channel.drain_events() method is to + request up to the 'prefetch_count' messages on every request to SQS. + These messages are stored locally in a deque object and passed back + to the Transport until the deque is empty, before triggering a new + API call to Amazon. + + This behavior dramatically speeds up the rate that you can pull tasks + from SQS when you have short-running tasks (or a large number of workers). + + When a Celery worker has multiple queues to monitor, it will pull down + up to 'prefetch_count' messages from queueA and work on them all before + moving on to queueB. If queueB is empty, it will wait up until + 'polling_interval' expires before moving back and checking on queueA. """ + from __future__ import absolute_import +import collections import socket import string @@ -22,11 +55,15 @@ from boto.sqs.message import Message from kombu.five import Empty, range, text_t +from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.encoding import bytes_to_str, safe_str +from kombu.transport.virtual import scheduling from . import virtual +logger = get_logger(__name__) + # dots are replaced by dash, all other punctuation # replaced by underscore. CHARS_REPLACE_TABLE = dict((ord(c), 0x5f) @@ -149,11 +186,18 @@ self._queue_cache[queue.name] = queue self._fanout_queues = set() + # The drain_events() method stores extra messages in a local + # Deque object. This allows multiple messages to be requested from + # SQS at once for performance, but maintains the same external API + # to the caller of the drain_events() method. + self._queue_message_cache = collections.deque() + def basic_consume(self, queue, no_ack, *args, **kwargs): if no_ack: self._noack_queues.add(queue) - return super(Channel, self).basic_consume(queue, no_ack, - *args, **kwargs) + return super(Channel, self).basic_consume( + queue, no_ack, *args, **kwargs + ) def basic_cancel(self, consumer_tag): if consumer_tag in self._consumers: @@ -161,12 +205,53 @@ self._noack_queues.discard(queue) return super(Channel, self).basic_cancel(consumer_tag) + def drain_events(self, timeout=None): + """Return a single payload message from one of our queues. + + :raises Empty: if no messages available. + + """ + # If we're not allowed to consume or have no consumers, raise Empty + if not self._consumers or not self.qos.can_consume(): + raise Empty() + message_cache = self._queue_message_cache + + # Check if there are any items in our buffer. If there are any, pop + # off that queue first. + try: + return message_cache.popleft() + except IndexError: + pass + + # At this point, go and get more messages from SQS + res, queue = self._poll(self.cycle, timeout=timeout) + message_cache.extend((r, queue) for r in res) + + # Now try to pop off the queue again. + try: + return message_cache.popleft() + except IndexError: + raise Empty() + + def _reset_cycle(self): + """Reset the consume cycle. + + :returns: a FairCycle object that points to our _get_bulk() method + rather than the standard _get() method. This allows for multiple + messages to be returned at once from SQS (based on the prefetch + limit). + + """ + self._cycle = scheduling.FairCycle( + self._get_bulk, self._active_queues, Empty, + ) + def entity_name(self, name, table=CHARS_REPLACE_TABLE): """Format AMQP queue name into a legal SQS queue name.""" return text_t(safe_str(name)).translate(table) def _new_queue(self, queue, **kwargs): - """Ensures a queue exists in SQS.""" + """Ensure a queue with given name exists in SQS.""" # Translate to SQS name for consistency with initial # _queue_cache population. queue = self.entity_name(self.queue_name_prefix + queue) @@ -212,10 +297,10 @@ def _delete(self, queue, *args): """delete queue by name.""" - self._queue_cache.pop(queue, None) if self.supports_fanout: self.table.queue_delete(queue) super(Channel, self)._delete(queue) + self._queue_cache.pop(queue, None) def exchange_delete(self, exchange, **kwargs): """Delete exchange by name.""" @@ -241,23 +326,86 @@ for route in self.table.routes_for(exchange): self._put(route['queue'], message, **kwargs) - def _get(self, queue): - """Try to retrieve a single message off ``queue``.""" + def _get_from_sqs(self, queue, count=1): + """Retrieve messages from SQS and returns the raw SQS message objects. + + :returns: List of SQS message objects + + """ q = self._new_queue(queue) if W_LONG_POLLING and queue not in self._fanout_queues: - rs = q.get_messages(1, wait_time_seconds=self.wait_time_seconds) + return q.get_messages( + count, wait_time_seconds=self.wait_time_seconds, + ) else: # boto < 2.8 - rs = q.get_messages(1) - if rs: - m = rs[0] - payload = loads(bytes_to_str(rs[0].get_body())) - if queue in self._noack_queues: - q.delete_message(m) - else: - payload['properties']['delivery_info'].update({ - 'sqs_message': m, 'sqs_queue': q, }) - return payload - raise Empty() + return q.get_messages(count) + + def _message_to_python(self, message, queue_name, queue): + payload = loads(bytes_to_str(message.get_body())) + if queue_name in self._noack_queues: + queue.delete_message(message) + else: + payload['properties']['delivery_info'].update({ + 'sqs_message': message, 'sqs_queue': queue, + }) + return payload + + def _messages_to_python(self, messages, queue): + """Convert a list of SQS Message objects into Payloads. + + This method handles converting SQS Message objects into + Payloads, and appropriately updating the queue depending on + the 'ack' settings for that queue. + + :param messages: A list of SQS Message objects. + :param queue: String name representing the queue they came from + + :returns: A list of Payload objects + + """ + q = self._new_queue(queue) + return [self._message_to_python(m, queue, q) for m in messages] + + def _get_bulk(self, queue, max_if_unlimited=10): + """Try to retrieve multiple messages off ``queue``. + + Where _get() returns a single Payload object, this method returns a + list of Payload objects. The number of objects returned is determined + by the total number of messages available in the queue and the + number of messages that the QoS object allows (based on the + prefetch_count). + + .. note:: + Ignores QoS limits so caller is responsible for checking + that we are allowed to consume at least one message from the + queue. get_bulk will then ask QoS for an estimate of + the number of extra messages that we can consume. + + args: + queue: The queue name (string) to pull from + + returns: + payloads: A list of payload objects returned + """ + # drain_events calls `can_consume` first, consuming + # a token, so we know that we are allowed to consume at least + # one message. + maxcount = self.qos.can_consume_max_estimate() + maxcount = max_if_unlimited if maxcount is None else max(maxcount, 1) + messages = self._get_from_sqs(queue, count=maxcount) + + if not messages: + raise Empty() + return self._messages_to_python(messages, queue) + + def _get(self, queue): + """Try to retrieve a single message off ``queue``.""" + messages = self._get_from_sqs(queue, count=1) + + if not messages: + raise Empty() + + return self._messages_to_python(messages, queue)[0] def _restore(self, message, unwanted_delivery_info=('sqs_message', 'sqs_queue')): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/transport/__init__.py new/kombu-3.0.8/kombu/transport/__init__.py --- old/kombu-3.0.6/kombu/transport/__init__.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/transport/__init__.py 2013-12-13 15:41:17.000000000 +0100 @@ -7,8 +7,6 @@ """ from __future__ import absolute_import -from collections import Callable - from kombu.five import string_t from kombu.syn import _detect_environment from kombu.utils import symbol_by_name @@ -29,7 +27,7 @@ def __inner(): import warnings - _new = isinstance(xxx, Callable) and xxx() or xxx + _new = callable(xxx) and xxx() or xxx gtransport = 'ghettoq.taproot.{0}'.format(name) ktransport = 'kombu.transport.{0}.Transport'.format(_new) this = alias or name @@ -89,7 +87,7 @@ transport, alt)) raise KeyError('No such transport: {0}'.format(transport)) else: - if isinstance(transport, Callable): + if callable(transport): transport = transport() return symbol_by_name(transport) return transport diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/transport/redis.py new/kombu-3.0.8/kombu/transport/redis.py --- old/kombu-3.0.6/kombu/transport/redis.py 2013-11-21 15:08:35.000000000 +0100 +++ new/kombu-3.0.8/kombu/transport/redis.py 2013-12-13 15:41:17.000000000 +0100 @@ -481,6 +481,7 @@ response = c.parse_response() except self.connection_errors: self._in_listen = False + raise Empty() if response is not None: payload = self._handle_message(c, response) if bytes_to_str(payload['type']) == 'message': @@ -527,10 +528,10 @@ self._in_poll = False def _poll_error(self, type, **options): - try: - self.client.parse_response(type) - except self.connection_errors: - pass + if type == 'LISTEN': + self.subclient.parse_response() + else: + self.client.parse_response(self.client.connection, type) def _get(self, queue): with self.conn_or_acquire() as client: @@ -635,14 +636,18 @@ if queue in self.auto_delete_queues: self.queue_delete(queue) - # Close connections - for attr in 'client', 'subclient': - try: - self.__dict__[attr].connection.disconnect() - except (KeyError, AttributeError, self.ResponseError): - pass + self._close_clients() + super(Channel, self).close() + def _close_clients(self): + # Close connections + for attr in 'client', 'subclient': + try: + self.__dict__[attr].connection.disconnect() + except (KeyError, AttributeError, self.ResponseError): + pass + def _prepare_virtual_host(self, vhost): if not isinstance(vhost, int): if not vhost or vhost == '/': @@ -832,7 +837,9 @@ return ( (virtual.Transport.connection_errors + ( InconsistencyError, - socket.timeout, + socket.error, + IOError, + OSError, exceptions.ConnectionError, exceptions.AuthenticationError)), (virtual.Transport.channel_errors + ( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/transport/virtual/__init__.py new/kombu-3.0.8/kombu/transport/virtual/__init__.py --- old/kombu-3.0.6/kombu/transport/virtual/__init__.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/transport/virtual/__init__.py 2013-12-13 17:25:22.000000000 +0100 @@ -23,7 +23,7 @@ from kombu.exceptions import ResourceError, ChannelError from kombu.five import Empty, items, monotonic -from kombu.utils import emergency_dump_state, say, uuid +from kombu.utils import emergency_dump_state, kwdict, say, uuid from kombu.utils.compat import OrderedDict from kombu.utils.encoding import str_to_bytes, bytes_to_str @@ -130,6 +130,27 @@ pcount = self.prefetch_count return not pcount or len(self._delivered) - len(self._dirty) < pcount + def can_consume_max_estimate(self): + """Returns the maximum number of messages allowed to be returned. + + Returns an estimated number of messages that a consumer may be allowed + to consume at once from the broker. This is used for services where + bulk 'get message' calls are preferred to many individual 'get message' + calls - like SQS. + + returns: + An integer > 0 + """ + pcount = self.prefetch_count + count = None + if pcount: + count = pcount - (len(self._delivered) - len(self._dirty)) + + if count < 1: + return 1 + + return count + def append(self, message, delivery_tag): """Append message to transactional state.""" if self._dirty: @@ -210,6 +231,15 @@ finally: state.restored = True + def restore_visible(self, *args, **kwargs): + """Restore any pending unackwnowledged messages for visibility_timeout + style implementations. + + Optional: Currently only used by the Redis transport. + + """ + pass + class Message(base.Message): @@ -219,7 +249,7 @@ body = payload.get('body') if body: body = channel.decode_body(body, properties.get('body_encoding')) - fields = { + kwargs.update({ 'body': body, 'delivery_tag': properties['delivery_tag'], 'content_type': payload.get('content-type'), @@ -228,8 +258,8 @@ 'properties': properties, 'delivery_info': properties.get('delivery_info'), 'postencode': 'utf-8', - } - super(Message, self).__init__(channel, **dict(kwargs, **fields)) + }) + super(Message, self).__init__(channel, **kwdict(kwargs)) def serializable(self): props = self.properties diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/utils/__init__.py new/kombu-3.0.8/kombu/utils/__init__.py --- old/kombu-3.0.6/kombu/utils/__init__.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/utils/__init__.py 2013-12-16 17:19:58.000000000 +0100 @@ -13,6 +13,7 @@ from contextlib import contextmanager from itertools import count, repeat +from functools import wraps from time import sleep from uuid import UUID, uuid4 as _uuid4, _uuid_generate_random diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu/utils/debug.py new/kombu-3.0.8/kombu/utils/debug.py --- old/kombu-3.0.6/kombu/utils/debug.py 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/kombu/utils/debug.py 2013-12-13 15:41:17.000000000 +0100 @@ -9,7 +9,6 @@ import logging -from collections import Callable from functools import wraps from kombu.five import items @@ -37,7 +36,7 @@ def __getattr__(self, key): meth = getattr(self.instance, key) - if not isinstance(meth, Callable) or key in self.__ignore: + if not callable(meth) or key in self.__ignore: return meth @wraps(meth) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu.egg-info/PKG-INFO new/kombu-3.0.8/kombu.egg-info/PKG-INFO --- old/kombu-3.0.6/kombu.egg-info/PKG-INFO 2013-11-21 17:48:58.000000000 +0100 +++ new/kombu-3.0.8/kombu.egg-info/PKG-INFO 2013-12-16 18:35:31.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: kombu -Version: 3.0.6 +Version: 3.0.8 Summary: Messaging library for Python Home-page: http://kombu.readthedocs.org Author: Ask Solem @@ -12,7 +12,7 @@ kombu - Messaging library for Python ======================================== - :Version: 3.0.6 + :Version: 3.0.8 `Kombu` is a messaging library for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/kombu.egg-info/SOURCES.txt new/kombu-3.0.8/kombu.egg-info/SOURCES.txt --- old/kombu-3.0.6/kombu.egg-info/SOURCES.txt 2013-11-21 17:48:58.000000000 +0100 +++ new/kombu-3.0.8/kombu.egg-info/SOURCES.txt 2013-12-16 18:35:32.000000000 +0100 @@ -177,6 +177,7 @@ kombu/tests/async/__init__.py kombu/tests/async/test_hub.py kombu/tests/transport/__init__.py +kombu/tests/transport/test_SQS.py kombu/tests/transport/test_amqplib.py kombu/tests/transport/test_base.py kombu/tests/transport/test_filesystem.py @@ -243,6 +244,7 @@ requirements/pkgutils.txt requirements/py26.txt requirements/test-ci.txt +requirements/test-ci3.txt requirements/test.txt requirements/test3.txt requirements/extras/beanstalk.txt diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/requirements/test-ci.txt new/kombu-3.0.8/requirements/test-ci.txt --- old/kombu-3.0.6/requirements/test-ci.txt 2013-11-10 01:53:13.000000000 +0100 +++ new/kombu-3.0.8/requirements/test-ci.txt 2013-12-13 17:25:22.000000000 +0100 @@ -1,3 +1,4 @@ +boto coverage>=3.0 redis PyYAML diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/requirements/test-ci3.txt new/kombu-3.0.8/requirements/test-ci3.txt --- old/kombu-3.0.6/requirements/test-ci3.txt 1970-01-01 01:00:00.000000000 +0100 +++ new/kombu-3.0.8/requirements/test-ci3.txt 2013-12-13 17:25:22.000000000 +0100 @@ -0,0 +1,4 @@ +coverage>=3.0 +redis +PyYAML +msgpack-python>0.2.0 # 0.2.0 dropped 2.5 support diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-3.0.6/setup.cfg new/kombu-3.0.8/setup.cfg --- old/kombu-3.0.6/setup.cfg 2013-11-21 17:49:02.000000000 +0100 +++ new/kombu-3.0.8/setup.cfg 2013-12-16 18:35:35.000000000 +0100 @@ -16,7 +16,6 @@ kombu.transport.couchdb kombu.transport.beanstalk kombu.transport.sqlalchemy* - kombu.transport.SQS kombu.transport.zookeeper kombu.transport.zmq kombu.transport.django* -- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
