Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-kombu for openSUSE:Factory 
checked in at 2022-03-23 20:15:28
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-kombu (Old)
 and      /work/SRC/openSUSE:Factory/.python-kombu.new.25692 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-kombu"

Wed Mar 23 20:15:28 2022 rev:75 rq:963484 version:5.2.4

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes        
2022-01-11 21:20:42.089019255 +0100
+++ /work/SRC/openSUSE:Factory/.python-kombu.new.25692/python-kombu.changes     
2022-03-23 20:15:43.046372436 +0100
@@ -1,0 +2,20 @@
+Mon Mar 21 08:16:03 UTC 2022 - Dirk M??ller <dmuel...@suse.com>
+
+- update to 5.2.4:
+  * allow getting recoverable_connection_errors without an active transport
+  * [pre-commit.ci] pre-commit autoupdate
+  * prevent KeyError: 'purelib'
+  * Revert "try pining setuptools (#1466)"
+  * Fix issue #789: Async http code not allowing for proxy config
+  * The times of retrying is not correct
+  * Set redelivered property for Celery with Redis
+  * Some small updates
+  * Remove use of OrderedDict in various places
+  * Warn about missing hostname only when default one is available
+  * All supported versions of Python define package.
+  * Added global_keyprefix support for pubsub clients
+  * try pytest 7
+  * Add an option to not base64-encode SQS messages.
+  * SQS: Fix message arg in backoff policy
+
+-------------------------------------------------------------------

Old:
----
  kombu-5.2.3.tar.gz

New:
----
  kombu-5.2.4.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-kombu.spec ++++++
--- /var/tmp/diff_new_pack.uiwbFq/_old  2022-03-23 20:15:43.834372882 +0100
+++ /var/tmp/diff_new_pack.uiwbFq/_new  2022-03-23 20:15:43.838372883 +0100
@@ -19,7 +19,7 @@
 %{?!python_module:%define python_module() python-%{**} python3-%{**}}
 %define skip_python2 1
 Name:           python-kombu
-Version:        5.2.3
+Version:        5.2.4
 Release:        0
 Summary:        AMQP Messaging Framework for Python
 License:        BSD-3-Clause

++++++ kombu-5.2.3.tar.gz -> kombu-5.2.4.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/PKG-INFO new/kombu-5.2.4/PKG-INFO
--- old/kombu-5.2.3/PKG-INFO    2021-12-29 05:59:22.182823400 +0100
+++ new/kombu-5.2.4/PKG-INFO    2022-03-06 06:00:27.418216700 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: kombu
-Version: 5.2.3
+Version: 5.2.4
 Summary: Messaging library for Python.
 Home-page: https://kombu.readthedocs.io
 Author: Ask Solem
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/README.rst new/kombu-5.2.4/README.rst
--- old/kombu-5.2.3/README.rst  2021-12-29 05:55:15.000000000 +0100
+++ new/kombu-5.2.4/README.rst  2022-03-06 05:57:40.000000000 +0100
@@ -4,7 +4,7 @@
 
 |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |downloads|
 
-:Version: 5.2.3
+:Version: 5.2.4
 :Documentation: https://kombu.readthedocs.io/
 :Download: https://pypi.org/project/kombu/
 :Source: https://github.com/celery/kombu/
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/docs/includes/introduction.txt 
new/kombu-5.2.4/docs/includes/introduction.txt
--- old/kombu-5.2.3/docs/includes/introduction.txt      2021-12-29 
05:54:42.000000000 +0100
+++ new/kombu-5.2.4/docs/includes/introduction.txt      2022-03-06 
05:57:24.000000000 +0100
@@ -1,4 +1,4 @@
-:Version: 5.2.3
+:Version: 5.2.4
 :Web: https://kombu.readthedocs.io/
 :Download: https://pypi.org/project/kombu/
 :Source: https://github.com/celery/kombu/
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/docs/userguide/simple.rst 
new/kombu-5.2.4/docs/userguide/simple.rst
--- old/kombu-5.2.3/docs/userguide/simple.rst   2021-09-07 06:22:35.000000000 
+0200
+++ new/kombu-5.2.4/docs/userguide/simple.rst   2022-03-06 05:55:41.000000000 
+0100
@@ -61,7 +61,7 @@
     from kombu import Connection
 
 
-    class Logger(object):
+    class Logger:
 
         def __init__(self, connection, queue_name='log_queue',
                 serializer='json', compression=None):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/extra/requirements/pkgutils.txt 
new/kombu-5.2.4/extra/requirements/pkgutils.txt
--- old/kombu-5.2.3/extra/requirements/pkgutils.txt     2021-12-22 
04:35:30.000000000 +0100
+++ new/kombu-5.2.4/extra/requirements/pkgutils.txt     2022-03-06 
05:55:41.000000000 +0100
@@ -1,4 +1,4 @@
-setuptools>=59.1.1,<59.7.0
+setuptools>=47.0.0
 wheel>=0.29.0
 flake8>=2.5.4
 tox>=2.3.1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/extra/requirements/test.txt 
new/kombu-5.2.4/extra/requirements/test.txt
--- old/kombu-5.2.3/extra/requirements/test.txt 2021-11-08 00:46:52.000000000 
+0100
+++ new/kombu-5.2.4/extra/requirements/test.txt 2022-03-06 05:55:41.000000000 
+0100
@@ -1,4 +1,4 @@
 pytz>dev
-pytest~=6.2
+pytest~=7.0.1
 pytest-sugar
 Pyro4
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/__init__.py 
new/kombu-5.2.4/kombu/__init__.py
--- old/kombu-5.2.3/kombu/__init__.py   2021-12-29 05:53:28.000000000 +0100
+++ new/kombu-5.2.4/kombu/__init__.py   2022-03-06 05:57:04.000000000 +0100
@@ -5,7 +5,7 @@
 import sys
 from collections import namedtuple
 
-__version__ = '5.2.3'
+__version__ = '5.2.4'
 __author__ = 'Ask Solem'
 __contact__ = 'auv...@gmail.com, a...@celeryproject.org'
 __homepage__ = 'https://kombu.readthedocs.io'
@@ -86,12 +86,6 @@
         return result
 
 
-# 2.5 does not define __package__
-try:
-    package = __package__
-except NameError:  # pragma: no cover
-    package = 'kombu'
-
 # keep a reference to this module so that it's not garbage collected
 old_module = sys.modules[__name__]
 
@@ -106,7 +100,7 @@
     '__contact__': __contact__,
     '__homepage__': __homepage__,
     '__docformat__': __docformat__,
-    '__package__': package,
+    '__package__': __package__,
     'version_info_t': version_info_t,
     'version_info': version_info,
     'VERSION': VERSION
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/asynchronous/aws/sqs/queue.py 
new/kombu-5.2.4/kombu/asynchronous/aws/sqs/queue.py
--- old/kombu-5.2.3/kombu/asynchronous/aws/sqs/queue.py 2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/kombu/asynchronous/aws/sqs/queue.py 2022-03-06 
05:55:41.000000000 +0100
@@ -12,7 +12,7 @@
     return rs[0] if len(rs) == 1 else None
 
 
-class AsyncQueue():
+class AsyncQueue:
     """Async SQS Queue."""
 
     def __init__(self, connection=None, url=None, message_class=AsyncMessage):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/asynchronous/http/base.py 
new/kombu-5.2.4/kombu/asynchronous/http/base.py
--- old/kombu-5.2.3/kombu/asynchronous/http/base.py     2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/kombu/asynchronous/http/base.py     2022-03-06 
05:55:41.000000000 +0100
@@ -61,7 +61,7 @@
         auth_password (str): Password for HTTP authentication.
         auth_mode (str): Type of HTTP authentication (``basic`` or ``digest``).
         user_agent (str): Custom user agent for this request.
-        network_interace (str): Network interface to use for this request.
+        network_interface (str): Network interface to use for this request.
         on_ready (Callable): Callback to be called when the response has been
             received. Must accept single ``response`` argument.
         on_stream (Callable): Optional callback to be called every time body
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/asynchronous/http/curl.py 
new/kombu-5.2.4/kombu/asynchronous/http/curl.py
--- old/kombu-5.2.3/kombu/asynchronous/http/curl.py     2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/kombu/asynchronous/http/curl.py     2022-03-06 
05:55:41.000000000 +0100
@@ -231,9 +231,6 @@
             if request.proxy_username:
                 setopt(_pycurl.PROXYUSERPWD, '{}:{}'.format(
                     request.proxy_username, request.proxy_password or ''))
-        else:
-            setopt(_pycurl.PROXY, '')
-            curl.unsetopt(_pycurl.PROXYUSERPWD)
 
         setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0)
         setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/asynchronous/semaphore.py 
new/kombu-5.2.4/kombu/asynchronous/semaphore.py
--- old/kombu-5.2.3/kombu/asynchronous/semaphore.py     2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/kombu/asynchronous/semaphore.py     2022-03-06 
05:55:41.000000000 +0100
@@ -12,18 +12,15 @@
     range even if released more times than it was acquired.
 
     Example:
-        >>> from future import print_statement as printf
-        # ^ ignore: just fooling stupid pyflakes
-
         >>> x = LaxBoundedSemaphore(2)
 
-        >>> x.acquire(printf, 'HELLO 1')
+        >>> x.acquire(print, 'HELLO 1')
         HELLO 1
 
-        >>> x.acquire(printf, 'HELLO 2')
+        >>> x.acquire(print, 'HELLO 2')
         HELLO 2
 
-        >>> x.acquire(printf, 'HELLO 3')
+        >>> x.acquire(print, 'HELLO 3')
         >>> x._waiters   # private, do not access directly
         [print, ('HELLO 3',)]
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/connection.py 
new/kombu-5.2.4/kombu/connection.py
--- old/kombu-5.2.3/kombu/connection.py 2021-12-29 05:15:42.000000000 +0100
+++ new/kombu-5.2.4/kombu/connection.py 2022-03-06 05:55:41.000000000 +0100
@@ -2,7 +2,6 @@
 
 import os
 import socket
-from collections import OrderedDict
 from contextlib import contextmanager
 from itertools import count, cycle
 from operator import itemgetter
@@ -529,7 +528,7 @@
                             # the error if it persists after a new connection
                             # was successfully established.
                             raise
-                        if max_retries is not None and retries > max_retries:
+                        if max_retries is not None and retries >= max_retries:
                             raise
                         self._debug('ensure connection error: %r',
                                     exc, exc_info=1)
@@ -626,7 +625,7 @@
                 transport_cls, transport_cls)
         D = self.transport.default_connection_params
 
-        if not self.hostname:
+        if not self.hostname and D.get('hostname'):
             logger.warning(
                 "No hostname was supplied. "
                 f"Reverting to default '{D.get('hostname')}'")
@@ -658,7 +657,7 @@
 
     def info(self):
         """Get connection info."""
-        return OrderedDict(self._info())
+        return dict(self._info())
 
     def __eqhash__(self):
         return HashedSeq(self.transport_cls, self.hostname, self.userid,
@@ -932,7 +931,7 @@
         but where the connection must be closed and re-established first.
         """
         try:
-            return self.transport.recoverable_connection_errors
+            return self.get_transport_cls().recoverable_connection_errors
         except AttributeError:
             # There were no such classification before,
             # and all errors were assumed to be recoverable,
@@ -948,19 +947,19 @@
         recovered from without re-establishing the connection.
         """
         try:
-            return self.transport.recoverable_channel_errors
+            return self.get_transport_cls().recoverable_channel_errors
         except AttributeError:
             return ()
 
     @cached_property
     def connection_errors(self):
         """List of exceptions that may be raised by the connection."""
-        return self.transport.connection_errors
+        return self.get_transport_cls().connection_errors
 
     @cached_property
     def channel_errors(self):
         """List of exceptions that may be raised by the channel."""
-        return self.transport.channel_errors
+        return self.get_transport_cls().channel_errors
 
     @property
     def supports_heartbeats(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/SQS.py 
new/kombu-5.2.4/kombu/transport/SQS.py
--- old/kombu-5.2.3/kombu/transport/SQS.py      2021-12-19 06:12:32.000000000 
+0100
+++ new/kombu-5.2.4/kombu/transport/SQS.py      2022-03-06 05:55:41.000000000 
+0100
@@ -215,8 +215,8 @@
                 VisibilityTimeout=policy_value
             )
 
-    @staticmethod
-    def extract_task_name_and_number_of_retries(message):
+    def extract_task_name_and_number_of_retries(self, delivery_tag):
+        message = self._delivered[delivery_tag]
         message_headers = message.headers
         task_name = message_headers['task']
         number_of_retries = int(
@@ -395,8 +395,11 @@
     def _put(self, queue, message, **kwargs):
         """Put message onto queue."""
         q_url = self._new_queue(queue)
-        kwargs = {'QueueUrl': q_url,
-                  'MessageBody': AsyncMessage().encode(dumps(message))}
+        if self.sqs_base64_encoding:
+            body = AsyncMessage().encode(dumps(message))
+        else:
+            body = dumps(message)
+        kwargs = {'QueueUrl': q_url, 'MessageBody': body}
         if queue.endswith('.fifo'):
             if 'MessageGroupId' in message['properties']:
                 kwargs['MessageGroupId'] = \
@@ -420,22 +423,19 @@
             c.send_message(**kwargs)
 
     @staticmethod
-    def __b64_encoded(byte_string):
+    def _optional_b64_decode(byte_string):
         try:
-            return base64.b64encode(
-                base64.b64decode(byte_string)
-            ) == byte_string
+            data = base64.b64decode(byte_string)
+            if base64.b64encode(data) == byte_string:
+                return data
+            # else the base64 module found some embedded base64 content
+            # that should be ignored.
         except Exception:  # pylint: disable=broad-except
-            return False
-
-    def _message_to_python(self, message, queue_name, queue):
-        body = message['Body'].encode()
-        try:
-            if self.__b64_encoded(body):
-                body = base64.b64decode(body)
-        except TypeError:
             pass
+        return byte_string
 
+    def _message_to_python(self, message, queue_name, queue):
+        body = self._optional_b64_decode(message['Body'].encode())
         payload = loads(bytes_to_str(body))
         if queue_name in self._noack_queues:
             queue = self._new_queue(queue_name)
@@ -837,6 +837,10 @@
         return self.transport_options.get('wait_time_seconds',
                                           self.default_wait_time_seconds)
 
+    @cached_property
+    def sqs_base64_encoding(self):
+        return self.transport_options.get('sqs_base64_encoding', True)
+
 
 class Transport(virtual.Transport):
     """SQS Transport.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/consul.py 
new/kombu-5.2.4/kombu/transport/consul.py
--- old/kombu-5.2.3/kombu/transport/consul.py   2021-09-07 06:22:35.000000000 
+0200
+++ new/kombu-5.2.4/kombu/transport/consul.py   2022-01-11 06:25:58.000000000 
+0100
@@ -276,24 +276,25 @@
     driver_type = 'consul'
     driver_name = 'consul'
 
-    def __init__(self, *args, **kwargs):
-        if consul is None:
-            raise ImportError('Missing python-consul library')
-
-        super().__init__(*args, **kwargs)
-
-        self.connection_errors = (
+    if consul:
+        connection_errors = (
             virtual.Transport.connection_errors + (
                 consul.ConsulException, consul.base.ConsulException
             )
         )
 
-        self.channel_errors = (
+        channel_errors = (
             virtual.Transport.channel_errors + (
                 consul.ConsulException, consul.base.ConsulException
             )
         )
 
+    def __init__(self, *args, **kwargs):
+        if consul is None:
+            raise ImportError('Missing python-consul library')
+
+        super().__init__(*args, **kwargs)
+
     def verify_connection(self, connection):
         port = connection.client.port or self.default_port
         host = connection.client.hostname or DEFAULT_HOST
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/etcd.py 
new/kombu-5.2.4/kombu/transport/etcd.py
--- old/kombu-5.2.3/kombu/transport/etcd.py     2021-09-07 06:22:35.000000000 
+0200
+++ new/kombu-5.2.4/kombu/transport/etcd.py     2022-01-11 06:25:58.000000000 
+0100
@@ -242,6 +242,15 @@
     implements = virtual.Transport.implements.extend(
         exchange_type=frozenset(['direct']))
 
+    if etcd:
+        connection_errors = (
+            virtual.Transport.connection_errors + (etcd.EtcdException, )
+        )
+
+        channel_errors = (
+            virtual.Transport.channel_errors + (etcd.EtcdException, )
+        )
+
     def __init__(self, *args, **kwargs):
         """Create a new instance of etcd.Transport."""
         if etcd is None:
@@ -249,14 +258,6 @@
 
         super().__init__(*args, **kwargs)
 
-        self.connection_errors = (
-            virtual.Transport.connection_errors + (etcd.EtcdException, )
-        )
-
-        self.channel_errors = (
-            virtual.Transport.channel_errors + (etcd.EtcdException, )
-        )
-
     def verify_connection(self, connection):
         """Verify the connection works."""
         port = connection.client.port or self.default_port
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/qpid.py 
new/kombu-5.2.4/kombu/transport/qpid.py
--- old/kombu-5.2.3/kombu/transport/qpid.py     2021-09-07 06:22:35.000000000 
+0200
+++ new/kombu-5.2.4/kombu/transport/qpid.py     2022-03-06 05:55:41.000000000 
+0100
@@ -92,7 +92,6 @@
 import ssl
 import sys
 import uuid
-from collections import OrderedDict
 from gettext import gettext as _
 from queue import Empty
 from time import monotonic
@@ -189,7 +188,7 @@
     def __init__(self, session, prefetch_count=1):
         self.session = session
         self.prefetch_count = 1
-        self._not_yet_acked = OrderedDict()
+        self._not_yet_acked = {}
 
     def can_consume(self):
         """Return True if the :class:`Channel` can consume more messages.
@@ -229,8 +228,8 @@
         """Append message to the list of un-ACKed messages.
 
         Add a message, referenced by the delivery_tag, for ACKing,
-        rejecting, or getting later. Messages are saved into an
-        :class:`collections.OrderedDict` by delivery_tag.
+        rejecting, or getting later. Messages are saved into a
+        dict by delivery_tag.
 
         :param message: A received message that has not yet been ACKed.
         :type message: qpid.messaging.Message
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/redis.py 
new/kombu-5.2.4/kombu/transport/redis.py
--- old/kombu-5.2.3/kombu/transport/redis.py    2021-12-29 05:15:42.000000000 
+0100
+++ new/kombu-5.2.4/kombu/transport/redis.py    2022-03-06 05:55:41.000000000 
+0100
@@ -216,8 +216,7 @@
 
         if command in self.PREFIXED_SIMPLE_COMMANDS:
             args[0] = self.global_keyprefix + str(args[0])
-
-        if command in self.PREFIXED_COMPLEX_COMMANDS.keys():
+        elif command in self.PREFIXED_COMPLEX_COMMANDS:
             args_start = self.PREFIXED_COMPLEX_COMMANDS[command]["args_start"]
             args_end = self.PREFIXED_COMPLEX_COMMANDS[command]["args_end"]
 
@@ -267,6 +266,13 @@
         self.global_keyprefix = kwargs.pop('global_keyprefix', '')
         redis.Redis.__init__(self, *args, **kwargs)
 
+    def pubsub(self, **kwargs):
+        return PrefixedRedisPubSub(
+            self.connection_pool,
+            global_keyprefix=self.global_keyprefix,
+            **kwargs,
+        )
+
 
 class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
     """Custom Redis pipeline that takes global_keyprefix into consideration.
@@ -281,6 +287,58 @@
         redis.client.Pipeline.__init__(self, *args, **kwargs)
 
 
+class PrefixedRedisPubSub(redis.client.PubSub):
+    """Redis pubsub client that takes global_keyprefix into consideration."""
+
+    PUBSUB_COMMANDS = (
+        "SUBSCRIBE",
+        "UNSUBSCRIBE",
+        "PSUBSCRIBE",
+        "PUNSUBSCRIBE",
+    )
+
+    def __init__(self, *args, **kwargs):
+        self.global_keyprefix = kwargs.pop('global_keyprefix', '')
+        super().__init__(*args, **kwargs)
+
+    def _prefix_args(self, args):
+        args = list(args)
+        command = args.pop(0)
+
+        if command in self.PUBSUB_COMMANDS:
+            args = [
+                self.global_keyprefix + str(arg)
+                for arg in args
+            ]
+
+        return [command, *args]
+
+    def parse_response(self, *args, **kwargs):
+        """Parse a response from the Redis server.
+
+        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
+        returned by redis command.
+        """
+        ret = super().parse_response(*args, **kwargs)
+        if ret is None:
+            return ret
+
+        # response formats
+        # SUBSCRIBE and UNSUBSCRIBE
+        #  -> [message type, channel, message]
+        # PSUBSCRIBE and PUNSUBSCRIBE
+        #  -> [message type, pattern, channel, message]
+        message_type, *channels, message = ret
+        return [
+            message_type,
+            *[channel[len(self.global_keyprefix):] for channel in channels],
+            message,
+        ]
+
+    def execute_command(self, *args, **kwargs):
+        return super().execute_command(*self._prefix_args(args), **kwargs)
+
+
 class QoS(virtual.QoS):
     """Redis Ack Emulation."""
 
@@ -717,6 +775,7 @@
         try:
             try:
                 payload['headers']['redelivered'] = True
+                payload['properties']['delivery_info']['redelivered'] = True
             except KeyError:
                 pass
             for queue in self._lookup(exchange, routing_key):
@@ -1214,13 +1273,14 @@
         exchange_type=frozenset(['direct', 'topic', 'fanout'])
     )
 
+    if redis:
+        connection_errors, channel_errors = get_redis_error_classes()
+
     def __init__(self, *args, **kwargs):
         if redis is None:
             raise ImportError('Missing redis library (pip install redis)')
         super().__init__(*args, **kwargs)
 
-        # Get redis-py exceptions.
-        self.connection_errors, self.channel_errors = self._get_errors()
         # All channels share the same poller.
         self.cycle = MultiChannelPoller()
 
@@ -1265,10 +1325,6 @@
         """Handle AIO event for one of our file descriptors."""
         self.cycle.on_readable(fileno)
 
-    def _get_errors(self):
-        """Utility to import redis-py's exceptions at runtime."""
-        return get_redis_error_classes()
-
 
 if sentinel:
     class SentinelManagedSSLConnection(
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu/transport/virtual/base.py 
new/kombu-5.2.4/kombu/transport/virtual/base.py
--- old/kombu-5.2.3/kombu/transport/virtual/base.py     2021-12-29 
05:15:42.000000000 +0100
+++ new/kombu-5.2.4/kombu/transport/virtual/base.py     2022-03-06 
05:55:41.000000000 +0100
@@ -177,6 +177,8 @@
         self.channel = channel
         self.prefetch_count = prefetch_count or 0
 
+        # Standard Python dictionaries do not support setting attributes
+        # on the object, hence the use of OrderedDict
         self._delivered = OrderedDict()
         self._delivered.restored = False
         self._dirty = set()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/kombu.egg-info/PKG-INFO 
new/kombu-5.2.4/kombu.egg-info/PKG-INFO
--- old/kombu-5.2.3/kombu.egg-info/PKG-INFO     2021-12-29 05:59:21.000000000 
+0100
+++ new/kombu-5.2.4/kombu.egg-info/PKG-INFO     2022-03-06 06:00:27.000000000 
+0100
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: kombu
-Version: 5.2.3
+Version: 5.2.4
 Summary: Messaging library for Python.
 Home-page: https://kombu.readthedocs.io
 Author: Ask Solem
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/requirements/pkgutils.txt 
new/kombu-5.2.4/requirements/pkgutils.txt
--- old/kombu-5.2.3/requirements/pkgutils.txt   2021-12-22 04:35:30.000000000 
+0100
+++ new/kombu-5.2.4/requirements/pkgutils.txt   2022-03-06 05:55:41.000000000 
+0100
@@ -1,4 +1,4 @@
-setuptools>=59.1.1,<59.7.0
+setuptools>=47.0.0
 wheel>=0.29.0
 flake8>=2.5.4
 tox>=2.3.1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/requirements/test.txt 
new/kombu-5.2.4/requirements/test.txt
--- old/kombu-5.2.3/requirements/test.txt       2021-11-08 00:46:52.000000000 
+0100
+++ new/kombu-5.2.4/requirements/test.txt       2022-03-06 05:55:41.000000000 
+0100
@@ -1,4 +1,4 @@
 pytz>dev
-pytest~=6.2
+pytest~=7.0.1
 pytest-sugar
 Pyro4
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/setup.py new/kombu-5.2.4/setup.py
--- old/kombu-5.2.3/setup.py    2021-11-08 00:46:52.000000000 +0100
+++ new/kombu-5.2.4/setup.py    2022-03-06 05:55:41.000000000 +0100
@@ -2,7 +2,6 @@
 import os
 import re
 import sys
-from distutils.command.install import INSTALL_SCHEMES
 
 import setuptools
 import setuptools.command.test
@@ -56,9 +55,6 @@
     return fullsplit(head, [tail] + result)
 
 
-for scheme in list(INSTALL_SCHEMES.values()):
-    scheme['data'] = scheme['purelib']
-
 # if os.path.exists('README.rst'):
 #    long_description = codecs.open('README.rst', 'r', 'utf-8').read()
 # else:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/integration/test_redis.py 
new/kombu-5.2.4/t/integration/test_redis.py
--- old/kombu-5.2.3/t/integration/test_redis.py 2021-12-19 06:12:32.000000000 
+0100
+++ new/kombu-5.2.4/t/integration/test_redis.py 2022-01-11 06:25:58.000000000 
+0100
@@ -5,6 +5,7 @@
 import redis
 
 import kombu
+from kombu.transport.redis import Transport
 
 from .common import (BaseExchangeTypes, BaseMessage, BasePriority,
                      BasicFunctionality)
@@ -56,7 +57,11 @@
 @pytest.mark.env('redis')
 @pytest.mark.flaky(reruns=5, reruns_delay=2)
 class test_RedisBasicFunctionality(BasicFunctionality):
-    pass
+    def test_failed_connection__ConnectionError(self, invalid_connection):
+        # method raises transport exception
+        with pytest.raises(redis.exceptions.ConnectionError) as ex:
+            invalid_connection.connection
+        assert ex.type in Transport.connection_errors
 
 
 @pytest.mark.env('redis')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/unit/asynchronous/http/test_curl.py 
new/kombu-5.2.4/t/unit/asynchronous/http/test_curl.py
--- old/kombu-5.2.3/t/unit/asynchronous/http/test_curl.py       2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/t/unit/asynchronous/http/test_curl.py       2022-03-06 
05:55:41.000000000 +0100
@@ -1,4 +1,5 @@
-from unittest.mock import Mock, call, patch
+from io import BytesIO
+from unittest.mock import ANY, Mock, call, patch
 
 import pytest
 
@@ -131,3 +132,24 @@
             x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN)
             x.on_writable(fd, _pycurl=_pycurl)
             x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT)
+
+    def test_setup_request_sets_proxy_when_specified(self):
+        with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
+            x = self.Client()
+            proxy_host = 'http://www.example.com'
+            request = Mock(
+                name='request', headers={}, auth_mode=None, proxy_host=None
+            )
+            proxied_request = Mock(
+                name='request', headers={}, auth_mode=None,
+                proxy_host=proxy_host, proxy_port=123
+            )
+            x._setup_request(
+                x.Curl, request, BytesIO(), x.Headers(), _pycurl=_pycurl
+            )
+            with pytest.raises(AssertionError):
+                x.Curl.setopt.assert_any_call(_pycurl.PROXY, ANY)
+            x._setup_request(
+                x.Curl, proxied_request, BytesIO(), x.Headers(), _pycurl
+            )
+            x.Curl.setopt.assert_any_call(_pycurl.PROXY, proxy_host)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/unit/test_connection.py 
new/kombu-5.2.4/t/unit/test_connection.py
--- old/kombu-5.2.3/t/unit/test_connection.py   2021-12-29 05:15:42.000000000 
+0100
+++ new/kombu-5.2.4/t/unit/test_connection.py   2022-03-06 05:55:41.000000000 
+0100
@@ -99,6 +99,19 @@
         # see Appendix A of http://www.rabbitmq.com/uri-spec.html
         self.assert_info(Connection(url), **expected)
 
+    @pytest.mark.parametrize('url,expected', [
+        ('sqs://user:pass@',
+         {'userid': None, 'password': None, 'hostname': None,
+          'port': None, 'virtual_host': '/'}),
+        ('sqs://',
+         {'userid': None, 'password': None, 'hostname': None,
+          'port': None, 'virtual_host': '/'}),
+    ])
+    def test_sqs_example_urls(self, url, expected, caplog):
+        pytest.importorskip('boto3')
+        self.assert_info(Connection('sqs://'), **expected)
+        assert not caplog.records
+
     @pytest.mark.skip('TODO: urllib cannot parse ipv6 urls')
     def test_url_IPV6(self):
         self.assert_info(
@@ -293,7 +306,9 @@
         assert not c.is_evented
 
     def test_register_with_event_loop(self):
-        c = Connection(transport=Mock)
+        transport = Mock(name='transport')
+        transport.connection_errors = []
+        c = Connection(transport=transport)
         loop = Mock(name='loop')
         c.register_with_event_loop(loop)
         c.transport.register_with_event_loop.assert_called_with(
@@ -477,7 +492,7 @@
         def publish():
             raise _ConnectionError('failed connection')
 
-        self.conn.transport.connection_errors = (_ConnectionError,)
+        self.conn.get_transport_cls().connection_errors = (_ConnectionError,)
         ensured = self.conn.ensure(self.conn, publish)
         with pytest.raises(OperationalError):
             ensured()
@@ -485,7 +500,7 @@
     def test_autoretry(self):
         myfun = Mock()
 
-        self.conn.transport.connection_errors = (KeyError,)
+        self.conn.get_transport_cls().connection_errors = (KeyError,)
 
         def on_call(*args, **kwargs):
             myfun.side_effect = None
@@ -571,6 +586,18 @@
         conn = Connection(transport=MyTransport)
         assert conn.channel_errors == (KeyError, ValueError)
 
+    def test_channel_errors__exception_no_cache(self):
+        """Ensure the channel_errors can be retrieved without an initialized
+        transport.
+        """
+
+        class MyTransport(Transport):
+            channel_errors = (KeyError,)
+
+        conn = Connection(transport=MyTransport)
+        MyTransport.__init__ = Mock(side_effect=Exception)
+        assert conn.channel_errors == (KeyError,)
+
     def test_connection_errors(self):
 
         class MyTransport(Transport):
@@ -579,6 +606,80 @@
         conn = Connection(transport=MyTransport)
         assert conn.connection_errors == (KeyError, ValueError)
 
+    def test_connection_errors__exception_no_cache(self):
+        """Ensure the connection_errors can be retrieved without an
+        initialized transport.
+        """
+
+        class MyTransport(Transport):
+            connection_errors = (KeyError,)
+
+        conn = Connection(transport=MyTransport)
+        MyTransport.__init__ = Mock(side_effect=Exception)
+        assert conn.connection_errors == (KeyError,)
+
+    def test_recoverable_connection_errors(self):
+
+        class MyTransport(Transport):
+            recoverable_connection_errors = (KeyError, ValueError)
+
+        conn = Connection(transport=MyTransport)
+        assert conn.recoverable_connection_errors == (KeyError, ValueError)
+
+    def test_recoverable_connection_errors__fallback(self):
+        """Ensure missing recoverable_connection_errors on the Transport does
+        not cause a fatal error.
+        """
+
+        class MyTransport(Transport):
+            connection_errors = (KeyError,)
+            channel_errors = (ValueError,)
+
+        conn = Connection(transport=MyTransport)
+        assert conn.recoverable_connection_errors == (KeyError, ValueError)
+
+    def test_recoverable_connection_errors__exception_no_cache(self):
+        """Ensure the recoverable_connection_errors can be retrieved without
+        an initialized transport.
+        """
+
+        class MyTransport(Transport):
+            recoverable_connection_errors = (KeyError,)
+
+        conn = Connection(transport=MyTransport)
+        MyTransport.__init__ = Mock(side_effect=Exception)
+        assert conn.recoverable_connection_errors == (KeyError,)
+
+    def test_recoverable_channel_errors(self):
+
+        class MyTransport(Transport):
+            recoverable_channel_errors = (KeyError, ValueError)
+
+        conn = Connection(transport=MyTransport)
+        assert conn.recoverable_channel_errors == (KeyError, ValueError)
+
+    def test_recoverable_channel_errors__fallback(self):
+        """Ensure missing recoverable_channel_errors on the Transport does not
+        cause a fatal error.
+        """
+
+        class MyTransport(Transport):
+            pass
+
+        conn = Connection(transport=MyTransport)
+        assert conn.recoverable_channel_errors == ()
+
+    def test_recoverable_channel_errors__exception_no_cache(self):
+        """Ensure the recoverable_channel_errors can be retrieved without an
+        initialized transport.
+        """
+        class MyTransport(Transport):
+            recoverable_channel_errors = (KeyError,)
+
+        conn = Connection(transport=MyTransport)
+        MyTransport.__init__ = Mock(side_effect=Exception)
+        assert conn.recoverable_channel_errors == (KeyError,)
+
     def test_multiple_urls_hostname(self):
         conn = Connection(['example.com;amqp://example.com'])
         assert conn.as_uri() == 'amqp://guest:**@example.com:5672//'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/unit/transport/test_SQS.py 
new/kombu-5.2.4/t/unit/transport/test_SQS.py
--- old/kombu-5.2.3/t/unit/transport/test_SQS.py        2021-12-19 
06:12:32.000000000 +0100
+++ new/kombu-5.2.4/t/unit/transport/test_SQS.py        2022-03-06 
05:55:41.000000000 +0100
@@ -336,13 +336,13 @@
         with pytest.raises(Empty):
             self.channel._get_bulk(self.queue_name)
 
-    def test_is_base64_encoded(self):
+    def test_optional_b64_decode(self):
         raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": 
"celery.task.PingTask",' \
               b'"args": [],"kwargs": {},"retries": 0,"eta": 
"2009-11-17T12:30:56.527191"}'     # noqa
         b64_enc = base64.b64encode(raw)
-        assert self.channel._Channel__b64_encoded(b64_enc)
-        assert not self.channel._Channel__b64_encoded(raw)
-        assert not self.channel._Channel__b64_encoded(b"test123")
+        assert self.channel._optional_b64_decode(b64_enc) == raw
+        assert self.channel._optional_b64_decode(raw) == raw
+        assert self.channel._optional_b64_decode(b"test123") == b"test123"
 
     def test_messages_to_python(self):
         from kombu.asynchronous.aws.sqs.message import Message
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/unit/transport/test_qpid.py 
new/kombu-5.2.4/t/unit/transport/test_qpid.py
--- old/kombu-5.2.3/t/unit/transport/test_qpid.py       2021-09-07 
06:22:35.000000000 +0200
+++ new/kombu-5.2.4/t/unit/transport/test_qpid.py       2022-03-06 
05:55:41.000000000 +0100
@@ -4,7 +4,6 @@
 import sys
 import time
 import uuid
-from collections import OrderedDict
 from collections.abc import Callable
 from itertools import count
 from queue import Empty
@@ -33,7 +32,7 @@
     """
 
     def __init__(self, code=None, text=None):
-        super(Exception, self).__init__(self)
+        super().__init__(self)
         self.code = code
         self.text = text
 
@@ -57,7 +56,7 @@
         assert qos_limit_two.prefetch_count == 1
 
     def test__init___not_yet_acked_is_initialized(self):
-        assert isinstance(self.qos._not_yet_acked, OrderedDict)
+        assert isinstance(self.qos._not_yet_acked, dict)
 
 
 @pytest.mark.skip(reason='Not supported in Python3')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-5.2.3/t/unit/transport/test_redis.py 
new/kombu-5.2.4/t/unit/transport/test_redis.py
--- old/kombu-5.2.3/t/unit/transport/test_redis.py      2021-12-29 
05:15:42.000000000 +0100
+++ new/kombu-5.2.4/t/unit/transport/test_redis.py      2022-03-06 
05:55:41.000000000 +0100
@@ -1,3 +1,5 @@
+import base64
+import copy
 import socket
 import types
 from collections import defaultdict
@@ -269,9 +271,8 @@
 
 class Transport(redis.Transport):
     Channel = Channel
-
-    def _get_errors(self):
-        return ((KeyError,), (IndexError,))
+    connection_errors = (KeyError,)
+    channel_errors = (IndexError,)
 
 
 class test_Channel:
@@ -400,6 +401,67 @@
             )
             crit.assert_called()
 
+    def test_do_restore_message_celery(self):
+        # Payload value from real Celery project
+        payload = {
+            "body": base64.b64encode(dumps([
+                [],
+                {},
+                {
+                    "callbacks": None,
+                    "errbacks": None,
+                    "chain": None,
+                    "chord": None,
+                },
+            ]).encode()).decode(),
+            "content-encoding": "utf-8",
+            "content-type": "application/json",
+            "headers": {
+                "lang": "py",
+                "task": "common.tasks.test_task",
+                "id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+                "shadow": None,
+                "eta": None,
+                "expires": None,
+                "group": None,
+                "group_index": None,
+                "retries": 0,
+                "timelimit": [None, None],
+                "root_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+                "parent_id": None,
+                "argsrepr": "()",
+                "kwargsrepr": "{}",
+                "origin": "gen3437@Desktop",
+                "ignore_result": False,
+            },
+            "properties": {
+                "correlation_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+                "reply_to": "512f2489-ca40-3585-bc10-9b801a981782",
+                "delivery_mode": 2,
+                "delivery_info": {
+                    "exchange": "",
+                    "routing_key": "celery",
+                },
+                "priority": 0,
+                "body_encoding": "base64",
+                "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
+            },
+        }
+        result_payload = copy.deepcopy(payload)
+        result_payload['headers']['redelivered'] = True
+        result_payload['properties']['delivery_info']['redelivered'] = True
+        queue = 'celery'
+
+        client = Mock(name='client')
+        lookup = self.channel._lookup = Mock(name='_lookup')
+        lookup.return_value = [queue]
+
+        self.channel._do_restore_message(
+            payload, 'exchange', 'routing_key', client,
+        )
+
+        client.rpush.assert_called_with(queue, dumps(result_payload))
+
     def test_restore_no_messages(self):
         message = Mock(name='message')
 
@@ -907,15 +969,22 @@
         redis.Transport.on_readable(transport, 13)
         cycle.on_readable.assert_called_with(13)
 
-    def test_transport_get_errors(self):
-        assert redis.Transport._get_errors(self.connection.transport)
+    def test_transport_connection_errors(self):
+        """Ensure connection_errors are populated."""
+        assert redis.Transport.connection_errors
+
+    def test_transport_channel_errors(self):
+        """Ensure connection_errors are populated."""
+        assert redis.Transport.channel_errors
 
     def test_transport_driver_version(self):
         assert redis.Transport.driver_version(self.connection.transport)
 
-    def test_transport_get_errors_when_InvalidData_used(self):
+    def test_transport_errors_when_InvalidData_used(self):
         from redis import exceptions
 
+        from kombu.transport.redis import get_redis_error_classes
+
         class ID(Exception):
             pass
 
@@ -924,7 +993,7 @@
         exceptions.InvalidData = ID
         exceptions.DataError = None
         try:
-            errors = redis.Transport._get_errors(self.connection.transport)
+            errors = get_redis_error_classes()
             assert errors
             assert ID in errors[1]
         finally:
@@ -1045,6 +1114,26 @@
                 '\x06\x16\x06\x16queue'
             )
 
+    @patch("redis.client.PubSub.execute_command")
+    def test_global_keyprefix_pubsub(self, mock_execute_command):
+        from kombu.transport.redis import PrefixedStrictRedis
+
+        with Connection(transport=Transport) as conn:
+            client = PrefixedStrictRedis(global_keyprefix='foo_')
+
+            channel = conn.channel()
+            channel.global_keyprefix = 'foo_'
+            channel._create_client = Mock()
+            channel._create_client.return_value = client
+            channel.subclient.connection = Mock()
+            channel.active_fanout_queues.add('a')
+
+            channel._subscribe()
+            mock_execute_command.assert_called_with(
+                'PSUBSCRIBE',
+                'foo_/{db}.a',
+            )
+
 
 class test_Redis:
 

Reply via email to