Hello community,

here is the log from the commit of package python3-kombu for openSUSE:Factory 
checked in at 2016-01-30 11:31:22
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python3-kombu (Old)
 and      /work/SRC/openSUSE:Factory/.python3-kombu.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python3-kombu"

Changes:
--------
--- /work/SRC/openSUSE:Factory/python3-kombu/python3-kombu.changes      
2015-11-02 12:55:05.000000000 +0100
+++ /work/SRC/openSUSE:Factory/.python3-kombu.new/python3-kombu.changes 
2016-01-30 11:31:23.000000000 +0100
@@ -1,0 +2,36 @@
+Wed Jan 27 04:42:10 UTC 2016 - [email protected]
+
+- specfile:
+  * update copyright year
+
+- update to version 3.0.33:
+  * Now depends on :mod:`amqp` 1.4.9.
+  * Redis: Fixed problem with auxilliary connections causing the main
+    consumer connection to be closed (Issue #550).
+  * Qpid: No longer uses threads to operate, to ensure compatibility
+    with all environments (Issue #531).
+
+- changes from version 3.0.32:
+  * Redis: Fixed bug introduced in 3.0.31 where the redis transport
+    always connects to localhost, regardless of host setting.
+
+- changes from version 3.0.31:
+  * Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+    disconnected.
+  * Hub: Removed debug logging message: "Deregistered fd..." (Issue
+    #549).
+
+- changes from version 3.0.30:
+  * Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1. Fix
+    contributed by Kai Groner.
+  * Redis transport: Attempt at fixing problem with hanging consumer
+    disconnected from server.
+  * Event loop: Attempt at fixing issue with 100% CPU when using the
+    Redis transport,
+  * Database transport: Fixed oracle compatiblity. An "ORA-00907:
+    missing right parenthesis" error could manifest when using an
+    Oracle database with the database transport.  Fix contributed by
+    Deepak N.
+  * Documentation fixes Contributed by Tommaso Barbugli.
+
+-------------------------------------------------------------------

Old:
----
  kombu-3.0.29.tar.gz

New:
----
  kombu-3.0.33.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python3-kombu.spec ++++++
--- /var/tmp/diff_new_pack.IiprJn/_old  2016-01-30 11:31:24.000000000 +0100
+++ /var/tmp/diff_new_pack.IiprJn/_new  2016-01-30 11:31:24.000000000 +0100
@@ -1,7 +1,7 @@
 #
 # spec file for package python3-kombu
 #
-# Copyright (c) 2015 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2016 SUSE LINUX GmbH, Nuernberg, Germany.
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -17,7 +17,7 @@
 
 
 Name:           python3-kombu
-Version:        3.0.29
+Version:        3.0.33
 Release:        0
 Summary:        AMQP Messaging Framework for Python
 License:        BSD-3-Clause

++++++ kombu-3.0.29.tar.gz -> kombu-3.0.33.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/AUTHORS new/kombu-3.0.33/AUTHORS
--- old/kombu-3.0.29/AUTHORS    2015-10-26 18:58:25.000000000 +0100
+++ new/kombu-3.0.33/AUTHORS    2016-01-08 23:35:32.000000000 +0100
@@ -49,10 +49,13 @@
 Franck Cuny <[email protected]>
 Germán M. Bravo <[email protected]>
 Gregory Haskins <[email protected]>
+Hank John <[email protected]>
+haridsv
 Hong Minhee <[email protected]>
 Ian Eure <[email protected]>
 Ian Struble <[email protected]>
 Ionel Maries Cristian <[email protected]>
+iSlava <[email protected]>
 James Saryerwinnie <[email protected]>
 James Turk <[email protected]>
 Jason Cater <[email protected]>
@@ -66,6 +69,8 @@
 John Watson <[email protected]>
 Jonathan Halcrow <[email protected]>
 Joseph Crosland <[email protected]>
+Joshua Harlow <[email protected]>
+Kai Groner <[email protected]>
 Keith Fitzgerald <[email protected]>
 Kevin McCarthy <[email protected]>
 Kevin McDonald <[email protected]>
@@ -74,8 +79,10 @@
 Mahendra M <[email protected]>
 Marcin Lulek (ergo) <[email protected]>
 Mark Lavin <[email protected]>
+markow <[email protected]>
 Matt Wise <[email protected]>
 Maxime Rouyrre <[email protected]>
+mdk <[email protected]>
 Mher Movsisyan <[email protected]>
 Michael Barrett <[email protected]>
 Michael Nelson <[email protected]>
@@ -123,6 +130,3 @@
 Vincent Driessen <[email protected]>
 Zach Smith <[email protected]>
 Zhao Xiaohong <[email protected]>
-haridsv
-iSlava <[email protected]>
-markow <[email protected]>
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/Changelog new/kombu-3.0.33/Changelog
--- old/kombu-3.0.29/Changelog  2015-10-26 19:10:33.000000000 +0100
+++ new/kombu-3.0.33/Changelog  2016-01-09 03:36:24.000000000 +0100
@@ -4,6 +4,71 @@
  Change history
 ================
 
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+  consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+  all environments (Issue #531).
+
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+  connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+  disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
+.. _version-3.0.30:
+
+3.0.30
+======
+:release-date: 2015-12-07 12:28 A.M PST
+:release-by: Ask Solem
+
+- Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
+
+    Fix contributed by Kai Groner.
+
+- Redis transport: Attempt at fixing problem with hanging consumer
+  after disconnected from server.
+
+- Event loop:
+    Attempt at fixing issue with 100% CPU when using the Redis transport,
+
+- Database transport: Fixed oracle compatiblity.
+
+    An "ORA-00907: missing right parenthesis" error could manifest when using
+    an Oracle database with the database transport.
+
+    Fix contributed by Deepak N.
+
+- Documentation fixes
+
+    Contributed by Tommaso Barbugli.
+
 .. _version-3.0.29:
 
 3.0.29
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/PKG-INFO new/kombu-3.0.33/PKG-INFO
--- old/kombu-3.0.29/PKG-INFO   2015-10-26 19:13:16.000000000 +0100
+++ new/kombu-3.0.33/PKG-INFO   2016-01-09 03:38:13.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.29
+Version: 3.0.33
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.29
+        :Version: 3.0.33
         
         `Kombu` is a messaging library for Python.
         
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/README.rst new/kombu-3.0.33/README.rst
--- old/kombu-3.0.29/README.rst 2015-10-26 19:10:16.000000000 +0100
+++ new/kombu-3.0.33/README.rst 2016-01-09 03:12:59.000000000 +0100
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.29
+:Version: 3.0.33
 
 `Kombu` is a messaging library for Python.
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/docs/changelog.rst 
new/kombu-3.0.33/docs/changelog.rst
--- old/kombu-3.0.29/docs/changelog.rst 2015-10-26 19:10:33.000000000 +0100
+++ new/kombu-3.0.33/docs/changelog.rst 2016-01-09 03:36:24.000000000 +0100
@@ -4,6 +4,71 @@
  Change history
 ================
 
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+  consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+  all environments (Issue #531).
+
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+  connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+  disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
+.. _version-3.0.30:
+
+3.0.30
+======
+:release-date: 2015-12-07 12:28 A.M PST
+:release-by: Ask Solem
+
+- Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
+
+    Fix contributed by Kai Groner.
+
+- Redis transport: Attempt at fixing problem with hanging consumer
+  after disconnected from server.
+
+- Event loop:
+    Attempt at fixing issue with 100% CPU when using the Redis transport,
+
+- Database transport: Fixed oracle compatiblity.
+
+    An "ORA-00907: missing right parenthesis" error could manifest when using
+    an Oracle database with the database transport.
+
+    Fix contributed by Deepak N.
+
+- Documentation fixes
+
+    Contributed by Tommaso Barbugli.
+
 .. _version-3.0.29:
 
 3.0.29
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/docs/introduction.rst 
new/kombu-3.0.33/docs/introduction.rst
--- old/kombu-3.0.29/docs/introduction.rst      2015-10-26 19:10:16.000000000 
+0100
+++ new/kombu-3.0.33/docs/introduction.rst      2016-01-09 03:12:59.000000000 
+0100
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.29
+:Version: 3.0.33
 
 `Kombu` is a messaging library for Python.
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/docs/userguide/pools.rst 
new/kombu-3.0.33/docs/userguide/pools.rst
--- old/kombu-3.0.29/docs/userguide/pools.rst   2015-10-26 18:43:46.000000000 
+0100
+++ new/kombu-3.0.33/docs/userguide/pools.rst   2016-01-08 23:35:32.000000000 
+0100
@@ -155,7 +155,7 @@
     from kombu import pools
     from kombu import Connection
 
-    connections = pools.Connection(limit=100)
+    connections = pools.Connections(limit=100)
     producers = pools.Producers(limit=connections.limit)
 
     connection = Connection('amqp://guest:guest@localhost:5672//')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/docs/userguide/serialization.rst 
new/kombu-3.0.33/docs/userguide/serialization.rst
--- old/kombu-3.0.29/docs/userguide/serialization.rst   2015-10-26 
18:43:46.000000000 +0100
+++ new/kombu-3.0.33/docs/userguide/serialization.rst   2016-01-08 
23:35:32.000000000 +0100
@@ -106,10 +106,10 @@
 ======================================
 
 In some cases, you don't need your message data to be serialized. If you
-pass in a plain string or Unicode object as your message, then `Kombu` will
+pass in a plain string or Unicode object as your message and a custom 
`content_type`, then `Kombu` will
 not waste cycles serializing/deserializing the data.
 
-You can optionally specify a `content_type` and `content_encoding`
+You can optionally specify a `content_encoding`
 for the raw data::
 
     >>> with open("~/my_picture.jpg", "rb") as fh:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/extra/appveyor/install.ps1 
new/kombu-3.0.33/extra/appveyor/install.ps1
--- old/kombu-3.0.29/extra/appveyor/install.ps1 1970-01-01 01:00:00.000000000 
+0100
+++ new/kombu-3.0.33/extra/appveyor/install.ps1 2015-12-30 00:12:28.000000000 
+0100
@@ -0,0 +1,85 @@
+# Sample script to install Python and pip under Windows
+# Authors: Olivier Grisel and Kyle Kastner
+# License: CC0 1.0 Universal: http://creativecommons.org/publicdomain/zero/1.0/
+
+$BASE_URL = "https://www.python.org/ftp/python/";
+$GET_PIP_URL = "https://bootstrap.pypa.io/get-pip.py";
+$GET_PIP_PATH = "C:\get-pip.py"
+
+
+function DownloadPython ($python_version, $platform_suffix) {
+    $webclient = New-Object System.Net.WebClient
+    $filename = "python-" + $python_version + $platform_suffix + ".msi"
+    $url = $BASE_URL + $python_version + "/" + $filename
+
+    $basedir = $pwd.Path + "\"
+    $filepath = $basedir + $filename
+    if (Test-Path $filename) {
+        Write-Host "Reusing" $filepath
+        return $filepath
+    }
+
+    # Download and retry up to 5 times in case of network transient errors.
+    Write-Host "Downloading" $filename "from" $url
+    $retry_attempts = 3
+    for($i=0; $i -lt $retry_attempts; $i++){
+        try {
+            $webclient.DownloadFile($url, $filepath)
+            break
+        }
+        Catch [Exception]{
+            Start-Sleep 1
+        }
+   }
+   Write-Host "File saved at" $filepath
+   return $filepath
+}
+
+
+function InstallPython ($python_version, $architecture, $python_home) {
+    Write-Host "Installing Python" $python_version "for" $architecture "bit 
architecture to" $python_home
+    if (Test-Path $python_home) {
+        Write-Host $python_home "already exists, skipping."
+        return $false
+    }
+    if ($architecture -eq "32") {
+        $platform_suffix = ""
+    } else {
+        $platform_suffix = ".amd64"
+    }
+    $filepath = DownloadPython $python_version $platform_suffix
+    Write-Host "Installing" $filepath "to" $python_home
+    $args = "/qn /i $filepath TARGETDIR=$python_home"
+    Write-Host "msiexec.exe" $args
+    Start-Process -FilePath "msiexec.exe" -ArgumentList $args -Wait -Passthru
+    Write-Host "Python $python_version ($architecture) installation complete"
+    return $true
+}
+
+
+function InstallPip ($python_home) {
+    $pip_path = $python_home + "/Scripts/pip.exe"
+    $python_path = $python_home + "/python.exe"
+    if (-not(Test-Path $pip_path)) {
+        Write-Host "Installing pip..."
+        $webclient = New-Object System.Net.WebClient
+        $webclient.DownloadFile($GET_PIP_URL, $GET_PIP_PATH)
+        Write-Host "Executing:" $python_path $GET_PIP_PATH
+        Start-Process -FilePath "$python_path" -ArgumentList "$GET_PIP_PATH" 
-Wait -Passthru
+    } else {
+        Write-Host "pip already installed."
+    }
+}
+
+function InstallPackage ($python_home, $pkg) {
+    $pip_path = $python_home + "/Scripts/pip.exe"
+    & $pip_path install $pkg
+}
+
+function main () {
+    InstallPython $env:PYTHON_VERSION $env:PYTHON_ARCH $env:PYTHON
+    InstallPip $env:PYTHON
+    InstallPackage $env:PYTHON wheel
+}
+
+main
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/extra/appveyor/run_with_compiler.cmd 
new/kombu-3.0.33/extra/appveyor/run_with_compiler.cmd
--- old/kombu-3.0.29/extra/appveyor/run_with_compiler.cmd       1970-01-01 
01:00:00.000000000 +0100
+++ new/kombu-3.0.33/extra/appveyor/run_with_compiler.cmd       2015-12-30 
00:12:28.000000000 +0100
@@ -0,0 +1,47 @@
+:: To build extensions for 64 bit Python 3, we need to configure environment
+:: variables to use the MSVC 2010 C++ compilers from GRMSDKX_EN_DVD.iso of:
+:: MS Windows SDK for Windows 7 and .NET Framework 4 (SDK v7.1)
+::
+:: To build extensions for 64 bit Python 2, we need to configure environment
+:: variables to use the MSVC 2008 C++ compilers from GRMSDKX_EN_DVD.iso of:
+:: MS Windows SDK for Windows 7 and .NET Framework 3.5 (SDK v7.0)
+::
+:: 32 bit builds do not require specific environment configurations.
+::
+:: Note: this script needs to be run with the /E:ON and /V:ON flags for the
+:: cmd interpreter, at least for (SDK v7.0)
+::
+:: More details at:
+:: https://github.com/cython/cython/wiki/64BitCythonExtensionsOnWindows
+:: http://stackoverflow.com/a/13751649/163740
+::
+:: Author: Olivier Grisel
+:: License: CC0 1.0 Universal: 
http://creativecommons.org/publicdomain/zero/1.0/
+@ECHO OFF
+
+SET COMMAND_TO_RUN=%*
+SET WIN_SDK_ROOT=C:\Program Files\Microsoft SDKs\Windows
+
+SET MAJOR_PYTHON_VERSION="%PYTHON_VERSION:~0,1%"
+IF %MAJOR_PYTHON_VERSION% == "2" (
+    SET WINDOWS_SDK_VERSION="v7.0"
+) ELSE IF %MAJOR_PYTHON_VERSION% == "3" (
+    SET WINDOWS_SDK_VERSION="v7.1"
+) ELSE (
+    ECHO Unsupported Python version: "%MAJOR_PYTHON_VERSION%"
+    EXIT 1
+)
+
+IF "%PYTHON_ARCH%"=="64" (
+    ECHO Configuring Windows SDK %WINDOWS_SDK_VERSION% for Python 
%MAJOR_PYTHON_VERSION% on a 64 bit architecture
+    SET DISTUTILS_USE_SDK=1
+    SET MSSdk=1
+    "%WIN_SDK_ROOT%\%WINDOWS_SDK_VERSION%\Setup\WindowsSdkVer.exe" -q 
-version:%WINDOWS_SDK_VERSION%
+    "%WIN_SDK_ROOT%\%WINDOWS_SDK_VERSION%\Bin\SetEnv.cmd" /x64 /release
+    ECHO Executing: %COMMAND_TO_RUN%
+    call %COMMAND_TO_RUN% || EXIT 1
+) ELSE (
+    ECHO Using default MSVC build environment for 32 bit architecture
+    ECHO Executing: %COMMAND_TO_RUN%
+    call %COMMAND_TO_RUN% || EXIT 1
+)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/__init__.py 
new/kombu-3.0.33/kombu/__init__.py
--- old/kombu-3.0.29/kombu/__init__.py  2015-10-26 19:10:12.000000000 +0100
+++ new/kombu-3.0.33/kombu/__init__.py  2016-01-09 03:12:55.000000000 +0100
@@ -11,7 +11,7 @@
     'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
 )
 
-VERSION = version_info_t(3, 0, 29, '', '')
+VERSION = version_info_t(3, 0, 33, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = '[email protected]'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/async/hub.py 
new/kombu-3.0.33/kombu/async/hub.py
--- old/kombu-3.0.29/kombu/async/hub.py 2015-10-26 18:43:46.000000000 +0100
+++ new/kombu-3.0.33/kombu/async/hub.py 2016-01-08 23:35:32.000000000 +0100
@@ -30,6 +30,10 @@
 
 _current_loop = None
 
+W_UNKNOWN_EVENT = """\
+Received unknown event %r for fd %r, please contact support!\
+"""
+
 
 class Stop(BaseException):
     """Stops the event loop."""
@@ -146,12 +150,18 @@
                     logger.error('Error in timer: %r', exc, exc_info=1)
         return min(delay or min_delay, max_delay)
 
+    def _remove_from_loop(self, fd):
+        try:
+            self._unregister(fd)
+        finally:
+            self._discard(fd)
+
     def add(self, fd, callback, flags, args=(), consolidate=False):
         fd = fileno(fd)
         try:
             self.poller.register(fd, flags)
         except ValueError:
-            self._discard(fd)
+            self._remove_from_loop(fd)
             raise
         else:
             dest = self.readers if flags & READ else self.writers
@@ -163,8 +173,7 @@
 
     def remove(self, fd):
         fd = fileno(fd)
-        self._unregister(fd)
-        self._discard(fd)
+        self._remove_from_loop(fd)
 
     def run_forever(self):
         self._running = True
@@ -207,8 +216,7 @@
         writable = fd in self.writers
         on_write = self.writers.get(fd)
         try:
-            self._unregister(fd)
-            self._discard(fd)
+            self._remove_from_loop(fd)
         finally:
             if writable:
                 cb, args = on_write
@@ -218,8 +226,7 @@
         readable = fd in self.readers
         on_read = self.readers.get(fd)
         try:
-            self._unregister(fd)
-            self._discard(fd)
+            self._remove_from_loop(fd)
         finally:
             if readable:
                 cb, args = on_read
@@ -280,6 +287,7 @@
                     raise StopIteration()
 
                 for fd, event in events or ():
+                    general_error = False
                     if fd in consolidate and \
                             writers.get(fd) is None:
                         to_consolidate.append(fd)
@@ -299,6 +307,12 @@
                             self.remove_writer(fd)
                             continue
                     elif event & ERR:
+                        general_error = True
+                    else:
+                        logger.info(W_UNKNOWN_EVENT, event, fd)
+                        general_error = True
+
+                    if general_error:
                         try:
                             cb, cbargs = (readers.get(fd) or
                                           writers.get(fd))
@@ -306,7 +320,9 @@
                             pass
 
                     if cb is None:
+                        self.remove(fd)
                         continue
+
                     if isinstance(cb, generator):
                         try:
                             next(cb)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/five.py 
new/kombu-3.0.33/kombu/five.py
--- old/kombu-3.0.29/kombu/five.py      2015-10-26 18:43:47.000000000 +0100
+++ new/kombu-3.0.33/kombu/five.py      2016-01-08 23:35:32.000000000 +0100
@@ -41,13 +41,12 @@
     import platform
     SYSTEM = platform.system()
 
-    has_ctypes = True
     try:
         import ctypes
-    except ImportError:
-        has_ctypes = False
+    except ImportError:  # pragma: no cover
+        ctypes = None  # noqa
 
-    if SYSTEM == 'Darwin' and has_ctypes:
+    if SYSTEM == 'Darwin' and ctypes is not None:
         from ctypes.util import find_library
         libSystem = ctypes.CDLL(find_library('libSystem.dylib'))
         CoreServices = ctypes.CDLL(find_library('CoreServices'),
@@ -61,7 +60,7 @@
         def _monotonic():
             return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
 
-    elif SYSTEM == 'Linux' and has_ctypes:
+    elif SYSTEM == 'Linux' and ctypes is not None:
         # from stackoverflow:
         # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
         import os
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/tests/transport/test_qpid.py 
new/kombu-3.0.33/kombu/tests/transport/test_qpid.py
--- old/kombu-3.0.29/kombu/tests/transport/test_qpid.py 2015-10-26 
18:43:47.000000000 +0100
+++ new/kombu-3.0.33/kombu/tests/transport/test_qpid.py 2016-01-08 
23:35:32.000000000 +0100
@@ -4,7 +4,6 @@
 import ssl
 import socket
 import sys
-import threading
 import time
 
 from collections import Callable
@@ -16,7 +15,7 @@
 from kombu.five import Empty, keys, range, monotonic
 from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection,
                                   ConnectionError, Message, NotFound, QoS,
-                                  ReceiversMonitor, Transport)
+                                  Transport)
 from kombu.transport.virtual import Base64
 from kombu.tests.case import Case, Mock, case_no_pypy, case_no_python3
 from kombu.tests.case import patch
@@ -855,9 +854,9 @@
         self.mock_queue = Mock()
 
     def tearDown(self):
-        self.mock__has_queue.stop()
-        self.mock__size.stop()
-        self.mock__delete.stop()
+        self.patch__has_queue.stop()
+        self.patch__size.stop()
+        self.patch__delete.stop()
         super(TestChannelQueueDelete, self).tearDown()
 
     def test_checks_if_queue_exists(self):
@@ -1397,164 +1396,6 @@
 
 @case_no_python3
 @case_no_pypy
-class ReceiversMonitorTestBase(Case):
-
-    def setUp(self):
-        self.mock_session = Mock()
-        self.mock_w = Mock()
-        self.monitor = ReceiversMonitor(self.mock_session, self.mock_w)
-
-
-@case_no_python3
-@case_no_pypy
-class TestReceiversMonitorType(ReceiversMonitorTestBase):
-
-    def test_qpid_messaging_receivers_monitor_subclass_of_threading(self):
-        self.assertIsInstance(self.monitor, threading.Thread)
-
-
-@case_no_python3
-@case_no_pypy
-class TestReceiversMonitorInit(ReceiversMonitorTestBase):
-
-    def setUp(self):
-        thread___init___str = QPID_MODULE + '.threading.Thread.__init__'
-        self.patch_parent___init__ = patch(thread___init___str)
-        self.mock_Thread___init__ = self.patch_parent___init__.start()
-        super(TestReceiversMonitorInit, self).setUp()
-
-    def tearDown(self):
-        self.patch_parent___init__.stop()
-
-    def test_qpid_messaging_receivers_monitor_init_saves_session(self):
-        self.assertIs(self.monitor._session, self.mock_session)
-
-    def test_qpid_messaging_receivers_monitor_init_saves_fd(self):
-        self.assertIs(self.monitor._w_fd, self.mock_w)
-
-    def test_qpid_messaging_Receivers_monitor_init_calls_parent__init__(self):
-        self.mock_Thread___init__.assert_called_once_with()
-
-
-@case_no_python3
-@case_no_pypy
-class TestReceiversMonitorRun(ReceiversMonitorTestBase):
-
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_calls_monitor_receivers(
-            self, mock_sleep, mock_monitor_receivers):
-        mock_sleep.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_monitor_receivers.assert_called_once_with()
-
-    @patch(QPID_MODULE + '.SessionClosed', new=QpidException)
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_exits_on_session_closed(
-            self, mock_sleep, mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        try:
-            self.monitor.run()
-        except Exception:
-            self.fail('No exception should be raised here')
-        mock_monitor_receivers.assert_called_once_with()
-        mock_sleep.has_calls([])
-
-    @patch.object(Transport, 'connection_errors', new=(BreakOutException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    def test_receivers_monitors_run_calls_logs_exception_and_sleeps(
-            self, mock_logger, mock_sleep, mock_monitor_receivers):
-        exc_to_raise = IOError()
-        mock_monitor_receivers.side_effect = exc_to_raise
-        mock_sleep.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_logger.error.assert_called_once_with(exc_to_raise, exc_info=1)
-        mock_sleep.assert_called_once_with(10)
-
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_loops_when_exception_is_raised(
-            self, mock_sleep, mock_monitor_receivers):
-        def return_once_raise_on_second_call(*args):
-            mock_sleep.side_effect = BreakOutException()
-        mock_sleep.side_effect = return_once_raise_on_second_call
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_monitor_receivers.has_calls([call(), call()])
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    @patch(QPID_MODULE + '.sys.exc_info')
-    def test_receivers_monitor_exits_when_recoverable_exception_raised(
-            self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        self.assertFalse(mock_logger.error.called)
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    def test_receivers_monitor_saves_exception_when_recoverable_exc_raised(
-            self, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        self.assertIs(
-            self.mock_session.saved_exception,
-            mock_monitor_receivers.side_effect,
-        )
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    @patch(QPID_MODULE + '.sys.exc_info')
-    def test_receivers_monitor_writes_e_to_pipe_when_recoverable_exc_raised(
-            self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        mock_os_write.assert_called_once_with(self.mock_w, 'e')
-
-
-@case_no_python3
-@case_no_pypy
-class TestReceiversMonitorMonitorReceivers(ReceiversMonitorTestBase):
-
-    def test_receivers_monitor_monitor_receivers_calls_next_receivers(self):
-        self.mock_session.next_receiver.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.monitor_receivers()
-        self.mock_session.next_receiver.assert_called_once_with()
-
-    def test_receivers_monitor_monitor_receivers_writes_to_fd(self):
-        with patch(QPID_MODULE + '.os.write') as mock_os_write:
-            mock_os_write.side_effect = BreakOutException()
-            with self.assertRaises(BreakOutException):
-                self.monitor.monitor_receivers()
-            mock_os_write.assert_called_once_with(self.mock_w, '0')
-
-
-@case_no_python3
-@case_no_pypy
 @disable_runtime_dependency_check
 class TestTransportInit(Case):
 
@@ -1699,12 +1540,6 @@
         self.transport = Transport(self.client)
         self.mock_conn = Mock()
         self.transport.Connection = self.mock_conn
-        path_to_mock = QPID_MODULE + '.ReceiversMonitor'
-        self.patcher = patch(path_to_mock)
-        self.mock_ReceiverMonitor = self.patcher.start()
-
-    def tearDown(self):
-        self.patcher.stop()
 
     def test_transport_establish_conn_new_option_overwrites_default(self):
         self.client.userid = 'new-userid'
@@ -1863,22 +1698,7 @@
         new_conn = self.transport.establish_connection()
         self.assertIs(new_conn, self.mock_conn.return_value)
 
-    def test_transport_establish_conn_creates_ReceiversMonitor(self):
-        self.transport.establish_connection()
-        self.mock_ReceiverMonitor.assert_called_once_with(
-            self.transport.session, self.transport._w,
-        )
-
-    def test_transport_establish_conn_daemonizes_thread(self):
-        self.transport.establish_connection()
-        self.assertTrue(self.mock_ReceiverMonitor.return_value.daemon)
-
-    def test_transport_establish_conn_starts_thread(self):
-        self.transport.establish_connection()
-        new_receiver_monitor = self.mock_ReceiverMonitor.return_value
-        new_receiver_monitor.start.assert_called_once_with()
-
-    def test_transport_establish_conn_ignores_hostname_if_not_localhost(self):
+    def test_transport_establish_conn_uses_hostname_if_not_default(self):
         self.client.hostname = 'some_other_hostname'
         self.transport.establish_connection()
         self.mock_conn.assert_called_once_with(
@@ -1889,6 +1709,14 @@
             transport='tcp',
         )
 
+    def test_transport_sets_qpid_message_received_handler(self):
+        self.transport.establish_connection()
+        qpid_conn = self.mock_conn.return_value.get_qpid_connection
+        new_mock_session = qpid_conn.return_value.session.return_value
+        mock_set_callback = new_mock_session.set_message_received_handler
+        expected_callback = self.transport._qpid_session_ready
+        mock_set_callback.assert_called_once_with(expected_callback)
+
 
 @case_no_python3
 @case_no_pypy
@@ -1941,6 +1769,24 @@
 @case_no_python3
 @case_no_pypy
 @disable_runtime_dependency_check
+class TestTransportQpidSessionReady(Case):
+
+    def setUp(self):
+        self.patch_a = patch(QPID_MODULE + '.os.write')
+        self.mock_os_write = self.patch_a.start()
+        self.transport = Transport(Mock())
+
+    def tearDown(self):
+        self.patch_a.stop()
+
+    def test_transport__qpid_session_ready_writes_symbol_to_fd(self):
+        self.transport._qpid_session_ready()
+        self.mock_os_write.assert_called_once_with(self.transport._w, '0')
+
+
+@case_no_python3
+@case_no_pypy
+@disable_runtime_dependency_check
 class TestTransportOnReadable(Case):
 
     def setUp(self):
@@ -1952,6 +1798,7 @@
 
     def tearDown(self):
         self.patch_a.stop()
+        self.patch_b.stop()
 
     def test_transport_on_readable_reads_symbol_from_fd(self):
         self.transport.on_readable(Mock(), Mock())
@@ -1971,16 +1818,6 @@
         with self.assertRaises(IOError):
             self.transport.on_readable(Mock(), Mock())
 
-    def test_transport_on_readable_reads_e_off_of_pipe_raises_exc_info(self):
-        self.transport.session = Mock()
-        try:
-            raise IOError()
-        except Exception as error:
-            self.transport.session.saved_exception = error
-        self.mock_os_read.return_value = 'e'
-        with self.assertRaises(IOError):
-            self.transport.on_readable(Mock(), Mock())
-
 
 @case_no_python3
 @case_no_pypy
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/tests/transport/test_redis.py 
new/kombu-3.0.33/kombu/tests/transport/test_redis.py
--- old/kombu-3.0.29/kombu/tests/transport/test_redis.py        2015-10-26 
18:43:47.000000000 +0100
+++ new/kombu-3.0.33/kombu/tests/transport/test_redis.py        2016-01-09 
03:35:11.000000000 +0100
@@ -5,6 +5,7 @@
 
 from anyjson import dumps, loads
 from collections import defaultdict
+from contextlib import contextmanager
 from itertools import count
 
 from kombu import Connection, Exchange, Queue, Consumer, Producer
@@ -18,6 +19,23 @@
 )
 
 
+class JSONEqual(object):
+    # The order in which a dict is serialized to json depends on the hashseed
+    # so we have this to support json in .assert_has_call*.
+
+    def __init__(self, expected):
+        self.expected = expected
+
+    def __eq__(self, other):
+        return loads(other) == loads(self.expected)
+
+    def __str__(self):
+        return self.expected
+
+    def __repr__(self):
+        return '(json)%r' % (self.expected,)
+
+
 class _poll(eventio._select):
 
     def register(self, fd, flags):
@@ -171,6 +189,9 @@
 
         return self
 
+    def __repr__(self):
+        return '<MockClient: %r' % (id(self),)
+
 
 class Pipeline(object):
 
@@ -201,13 +222,21 @@
 
 
 class Channel(redis.Channel):
+    Client = Client
 
-    def _get_client(self):
+    def _get_async_client(self):
         return Client
 
-    def _get_pool(self):
+    def _create_client(self, async=False):
+        return Client()
+
+    def _get_pool(self, async=False):
         return Mock()
 
+    @contextmanager
+    def conn_or_acquire(self, client=None):
+        yield client if client is not None else self._create_client()
+
     def _get_response_error(self):
         return ResponseError
 
@@ -280,8 +309,8 @@
                 self._pool = pool_at_init[0]
                 super(XChannel, self).__init__(*args, **kwargs)
 
-            def _get_client(self):
-                return lambda *_, **__: client
+            def _create_client(self, async=False):
+                return client
 
         class XTransport(Transport):
             Channel = XChannel
@@ -302,9 +331,9 @@
         self.channel._pool = None
         self.channel._after_fork()
 
-        self.channel._pool = Mock(name='pool')
+        pool = self.channel._pool = Mock(name='pool')
         self.channel._after_fork()
-        self.channel._pool.disconnect.assert_called_with()
+        pool.disconnect.assert_called_with()
 
     def test_next_delivery_tag(self):
         self.assertNotEqual(
@@ -332,8 +361,10 @@
         self.channel._do_restore_message(
             pl2, 'ex', 'rkey', client,
         )
+
         client.rpush.assert_has_calls([
-            call('george', spl2), call('elaine', spl2),
+            call('george', JSONEqual(spl2)),
+            call('elaine', JSONEqual(spl2)),
         ])
 
         client.rpush.side_effect = KeyError()
@@ -347,7 +378,8 @@
         message = Mock(name='message')
         with patch('kombu.transport.redis.loads') as loads:
             loads.return_value = 'M', 'EX', 'RK'
-            client = self.channel.client = Mock(name='client')
+            client = self.channel._create_client = Mock(name='client')
+            client = client()
             client.pipeline = ContextMock()
             restore = self.channel._do_restore_message = Mock(
                 name='_do_restore_message',
@@ -376,7 +408,8 @@
             restore.assert_called_with('M', 'EX', 'RK', client, False)
 
     def test_qos_restore_visible(self):
-        client = self.channel.client = Mock(name='client')
+        client = self.channel._create_client = Mock(name='client')
+        client = client()
 
         def pipe(*args, **kwargs):
             return Pipeline(client)
@@ -556,36 +589,37 @@
 
     def test_put_fanout(self):
         self.channel._in_poll = False
-        c = self.channel.client = Mock()
+        c = self.channel._create_client = Mock()
 
         body = {'hello': 'world'}
         self.channel._put_fanout('exchange', body, '')
-        c.publish.assert_called_with('exchange', dumps(body))
+        c().publish.assert_called_with('exchange', JSONEqual(dumps(body)))
 
     def test_put_priority(self):
-        client = self.channel.client = Mock(name='client')
+        client = self.channel._create_client = Mock(name='client')
         msg1 = {'properties': {'delivery_info': {'priority': 3}}}
 
         self.channel._put('george', msg1)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 3), dumps(msg1),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 3), JSONEqual(dumps(msg1)),
         )
 
         msg2 = {'properties': {'delivery_info': {'priority': 313}}}
         self.channel._put('george', msg2)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 9), dumps(msg2),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 9), JSONEqual(dumps(msg2)),
         )
 
         msg3 = {'properties': {'delivery_info': {}}}
         self.channel._put('george', msg3)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 0), dumps(msg3),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 0), JSONEqual(dumps(msg3)),
         )
 
     def test_delete(self):
         x = self.channel
-        self.channel._in_poll = False
+        x._create_client = Mock()
+        x._create_client.return_value = x.client
         delete = x.client.delete = Mock()
         srem = x.client.srem = Mock()
 
@@ -597,7 +631,8 @@
         )
 
     def test_has_queue(self):
-        self.channel._in_poll = False
+        self.channel._create_client = Mock()
+        self.channel._create_client.return_value = self.channel.client
         exists = self.channel.client.exists = Mock()
         exists.return_value = True
         self.assertTrue(self.channel._has_queue('foo'))
@@ -662,16 +697,16 @@
         self.channel._rotate_cycle('elaine')
 
     @skip_if_not_module('redis')
-    def test_get_client(self):
+    def test_get_async_client(self):
         import redis as R
-        KombuRedis = redis.Channel._get_client(self.channel)
+        KombuRedis = redis.Channel._get_async_client(self.channel)
         self.assertTrue(KombuRedis)
 
         Rv = getattr(R, 'VERSION', None)
         try:
             R.VERSION = (2, 4, 0)
             with self.assertRaises(VersionMismatch):
-                redis.Channel._get_client(self.channel)
+                redis.Channel._get_async_client(self.channel)
         finally:
             if Rv is not None:
                 R.VERSION = Rv
@@ -682,24 +717,6 @@
         self.assertIs(redis.Channel._get_response_error(self.channel),
                       ResponseError)
 
-    def test_avail_client_when_not_in_poll(self):
-        self.channel._in_poll = False
-        c = self.channel.client = Mock()
-
-        with self.channel.conn_or_acquire() as client:
-            self.assertIs(client, c)
-
-    def test_avail_client_when_in_poll(self):
-        self.channel._in_poll = True
-        self.channel._pool = Mock()
-        cc = self.channel._create_client = Mock()
-        client = cc.return_value = Mock()
-
-        with self.channel.conn_or_acquire():
-            pass
-        self.channel.pool.release.assert_called_with(client.connection)
-        cc.assert_called_with()
-
     def test_register_with_event_loop(self):
         transport = self.connection.transport
         transport.cycle = Mock(name='cycle')
@@ -908,7 +925,7 @@
             channel._get('does-not-exist')
         channel.close()
 
-    def test_get_client(self):
+    def test_get_async_client(self):
 
         myredis, exceptions = _redis_modules()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/transport/django/managers.py 
new/kombu-3.0.33/kombu/transport/django/managers.py
--- old/kombu-3.0.29/kombu/transport/django/managers.py 2015-10-26 
18:43:47.000000000 +0100
+++ new/kombu-3.0.33/kombu/transport/django/managers.py 2016-01-08 
23:35:32.000000000 +0100
@@ -52,6 +52,8 @@
 
 
 def select_for_update(qs):
+    if connection.vendor == 'oracle':
+        return qs
     try:
         return qs.select_for_update()
     except AttributeError:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/transport/qpid.py 
new/kombu-3.0.33/kombu/transport/qpid.py
--- old/kombu-3.0.29/kombu/transport/qpid.py    2015-10-26 18:43:47.000000000 
+0100
+++ new/kombu-3.0.33/kombu/transport/qpid.py    2016-01-09 03:12:35.000000000 
+0100
@@ -8,7 +8,6 @@
 .. _`Qpid`: http://qpid.apache.org/
 .. _`qpid-python`: http://pypi.python.org/pypi/qpid-python/
 .. _`qpid-tools`: http://pypi.python.org/pypi/qpid-tools/
-.. _`Issue 2199`: https://github.com/celery/celery/issues/2199
 
 The use this transport you must install the necessary dependencies. These
 dependencies are available via PyPI and can be installed using the pip
@@ -22,11 +21,6 @@
     to underlying dependencies not being compatible. This version is
     tested and works with with Python 2.7.
 
-.. admonition:: Potential Deadlock
-
-    This transport should be used with caution due to a known
-    potential deadlock. See `Issue 2199`_ for more details.
-
 Authentication
 ==============
 
@@ -79,13 +73,11 @@
 """
 from __future__ import absolute_import
 
-import fcntl
 import os
 import select
 import socket
 import ssl
 import sys
-import threading
 import time
 
 from itertools import count
@@ -94,6 +86,11 @@
 import amqp.protocol
 
 try:
+    import fcntl
+except ImportError:
+    fcntl = None  # noqa
+
+try:
     import qpidtoollibs
 except ImportError:  # pragma: no cover
     qpidtoollibs = None     # noqa
@@ -1327,81 +1324,6 @@
             channel.connection = None
 
 
-class ReceiversMonitor(threading.Thread):
-    """A monitoring thread that reads and handles messages from all receivers.
-
-    A single instance of ReceiversMonitor is expected to be created by
-    :class:`Transport`.
-
-    In :meth:`monitor_receivers`, the thread monitors all receivers
-    associated with the session created by the Transport using the blocking
-    call to session.next_receiver(). When any receiver has messages
-    available, a symbol '0' is written to the self._w_fd file descriptor. The
-    :meth:`monitor_receivers` is designed not to exit, and loops over
-    session.next_receiver() forever.
-
-    The entry point of the thread is :meth:`run` which calls
-    :meth:`monitor_receivers`.
-
-    The thread is designed to be daemonized, and will be forcefully killed
-    when all non-daemon threads have already exited.
-    """
-
-    def __init__(self, session, w):
-        """Instantiate a ReceiversMonitor object
-
-        :param session: The session which needs all of its receivers
-            monitored.
-        :type session: :class:`qpid.messaging.endpoints.Session`
-        :param w: The file descriptor to write the '0' into when
-            next_receiver unblocks.
-        :type w: int
-        """
-        super(ReceiversMonitor, self).__init__()
-        self._session = session
-        self._w_fd = w
-
-    def run(self):
-        """Thread entry point for ReceiversMonitor
-
-        Calls :meth:`monitor_receivers` with a log-and-reenter behavior for
-        non connection errors. This guards against unexpected exceptions
-        which could cause this thread to exit unexpectedly.
-
-        A :class:`qpid.messaging.exceptions.SessionClosed` exception should
-        cause this thread to exit. This is a normal exit condition and the
-        thread is no longer needed.
-
-        If a connection error occurs, the exception needs to be propagated
-        to MainThread where the kombu exception handler can properly handle
-        it. The exception is stored as saved_exception on the self._session
-        object. The character 'e' is then written to the self.w_fd file
-        descriptor and then this thread exits.
-        """
-        while True:
-            try:
-                self.monitor_receivers()
-            except Transport.recoverable_connection_errors as exc:
-                self._session.saved_exception = exc
-                os.write(self._w_fd, 'e')
-                break
-            except SessionClosed:
-                break
-            except Exception as exc:
-                logger.error(exc, exc_info=1)
-            time.sleep(10)
-
-    def monitor_receivers(self):
-        """Monitor all receivers, and write to _w_fd when a message is ready.
-
-        The call to next_receiver() blocks until a message is ready. Once a
-        message is ready, write a '0' to _w_fd.
-        """
-        while True:
-            self._session.next_receiver()
-            os.write(self._w_fd, '0')
-
-
 class Transport(base.Transport):
     """Kombu native transport for a Qpid broker.
 
@@ -1479,7 +1401,8 @@
         self.verify_runtime_environment()
         super(Transport, self).__init__(*args, **kwargs)
         self.r, self._w = os.pipe()
-        fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
+        if fcntl is not None:
+            fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
 
     def verify_runtime_environment(self):
         """Verify that the runtime environment is acceptable.
@@ -1518,24 +1441,23 @@
                 'with your package manager. You can also try `pip install '
                 'qpid-python`.')
 
+    def _qpid_session_ready(self):
+        os.write(self._w, '0')
+
     def on_readable(self, connection, loop):
         """Handle any messages associated with this Transport.
 
         This method clears a single message from the externally monitored
         file descriptor by issuing a read call to the self.r file descriptor
         which removes a single '0' character that was placed into the pipe
-        by :class:`ReceiversMonitor`. Once a '0' is read, all available
-        events are drained through a call to :meth:`drain_events`.
+        by the Qpid session message callback handler. Once a '0' is read,
+        all available events are drained through a call to
+        :meth:`drain_events`.
 
         The behavior of self.r is adjusted in __init__ to be non-blocking,
         ensuring that an accidental call to this method when no more messages
         will arrive will not cause indefinite blocking.
 
-        If the self.r file descriptor receives the character 'e', an error
-        occurred in the background thread, and this thread should raise the
-        saved exception. The exception is stored as saved_exception on the
-        session object.
-
         Nothing is expected to be returned from :meth:`drain_events` because
         :meth:`drain_events` handles messages by calling callbacks that are
         maintained on the :class:`Connection` object. When
@@ -1570,9 +1492,7 @@
             functionality.
         :type loop: kombu.async.Hub
         """
-        symbol = os.read(self.r, 1)
-        if symbol == 'e':
-            raise self.session.saved_exception
+        os.read(self.r, 1)
         try:
             self.drain_events(connection)
         except socket.timeout:
@@ -1584,7 +1504,7 @@
         Register the callback self.on_readable to be called when an
         external epoll loop sees that the file descriptor registered is
         ready for reading. The file descriptor is created by this Transport,
-        and is updated by the ReceiversMonitor thread.
+        and is written to when a message is available.
 
         Because supports_ev == True, Celery expects to call this method to
         give the Transport an opportunity to register a read file descriptor
@@ -1680,9 +1600,7 @@
         conn = self.Connection(**opts)
         conn.client = self.client
         self.session = conn.get_qpid_connection().session()
-        monitor_thread = ReceiversMonitor(self.session, self._w)
-        monitor_thread.daemon = True
-        monitor_thread.start()
+        self.session.set_message_received_handler(self._qpid_session_ready)
         return conn
 
     def close_connection(self, connection):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/transport/redis.py 
new/kombu-3.0.33/kombu/transport/redis.py
--- old/kombu-3.0.29/kombu/transport/redis.py   2015-10-26 18:43:47.000000000 
+0100
+++ new/kombu-3.0.33/kombu/transport/redis.py   2016-01-08 23:35:32.000000000 
+0100
@@ -97,6 +97,11 @@
     )
 
 
+def get_redis_ConnectionError():
+    from redis import exceptions
+    return exceptions.ConnectionError
+
+
 class MutexHeld(Exception):
     pass
 
@@ -373,6 +378,7 @@
 
     _client = None
     _subclient = None
+    _closing = False
     supports_fanout = True
     keyprefix_queue = '_kombu.binding.%s'
     keyprefix_fanout = '/{db}.'
@@ -401,7 +407,10 @@
     #: and binding keys (like a topic exchange but using PUB/SUB).
     #: This will be enabled by default in a future version.
     fanout_patterns = False
+
+    _async_pool = None
     _pool = None
+    _disconnecting_pools = False
 
     from_transport_options = (
         virtual.Channel.from_transport_options +
@@ -427,7 +436,8 @@
             self.QoS = virtual.QoS
 
         self._queue_cycle = []
-        self.Client = self._get_client()
+        self.AsyncClient = self._get_async_client()
+        self.Client = redis.Redis
         self.ResponseError = self._get_response_error()
         self.active_fanout_queues = set()
         self.auto_delete_queues = set()
@@ -446,8 +456,7 @@
         try:
             self.client.info()
         except Exception:
-            if self._pool:
-                self._pool.disconnect()
+            self._disconnect_pools()
             raise
 
         self.connection.cycle.add(self)  # add to channel poller.
@@ -458,12 +467,28 @@
         register_after_fork(self, self._after_fork)
 
     def _after_fork(self):
-        if self._pool is not None:
-            self._pool.disconnect()
+        self._disconnect_pools()
+
+    def _disconnect_pools(self):
+        if not self._disconnecting_pools:
+            self._disconnecting_pools = True
+            try:
+                if self._async_pool is not None:
+                    self._async_pool.disconnect()
+                if self._pool is not None:
+                    self._pool.disconnect()
+                self._async_pool = self._pool = None
+            finally:
+                self._disconnecting_pools = False
 
     def _on_connection_disconnect(self, connection):
+        self._in_poll = False
+        self._in_listen = False
         if self.connection and self.connection.cycle:
             self.connection.cycle._on_connection_disconnect(connection)
+        self._disconnect_pools()
+        if not self._closing:
+            raise get_redis_ConnectionError()
 
     def _do_restore_message(self, payload, exchange, routing_key,
                             client=None, leftmost=False):
@@ -738,8 +763,8 @@
                 return sum(sizes[::2])
 
     def close(self):
-        if self._pool:
-            self._pool.disconnect()
+        self._closing = True
+        self._disconnect_pools()
         if not self.closed:
             # remove from channel poller.
             self.connection.cycle.discard(self)
@@ -776,7 +801,7 @@
                     ))
         return vhost
 
-    def _connparams(self):
+    def _connparams(self, async=False):
         conninfo = self.connection.client
         connparams = {'host': conninfo.hostname or '127.0.0.1',
                       'port': conninfo.port or DEFAULT_PORT,
@@ -803,52 +828,48 @@
             redis.Connection
         )
 
-        class Connection(connection_cls):
-            def disconnect(self):
-                channel._on_connection_disconnect(self)
-                super(Connection, self).disconnect()
-        connparams['connection_class'] = Connection
+        if async:
+            class Connection(connection_cls):
+                def disconnect(self):
+                    super(Connection, self).disconnect()
+                    channel._on_connection_disconnect(self)
+            connparams['connection_class'] = Connection
 
         return connparams
 
-    def _create_client(self):
+    def _create_client(self, async=False):
+        if async:
+            return self.AsyncClient(connection_pool=self.async_pool)
         return self.Client(connection_pool=self.pool)
 
-    def _get_pool(self):
-        params = self._connparams()
+    def _get_pool(self, async=False):
+        params = self._connparams(async=async)
         self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
         return redis.ConnectionPool(**params)
 
-    def _get_client(self):
+    def _get_async_client(self):
         if redis.VERSION < (2, 4, 4):
             raise VersionMismatch(
                 'Redis transport requires redis-py versions 2.4.4 or later. '
                 'You have {0.__version__}'.format(redis))
 
-        # KombuRedis maintains a connection attribute on it's instance and
+        # AsyncRedis maintains a connection attribute on it's instance and
         # uses that when executing commands
         # This was added after redis-py was changed.
-        class KombuRedis(redis.Redis):  # pragma: no cover
+        class AsyncRedis(redis.Redis):  # pragma: no cover
 
             def __init__(self, *args, **kwargs):
-                super(KombuRedis, self).__init__(*args, **kwargs)
+                super(AsyncRedis, self).__init__(*args, **kwargs)
                 self.connection = self.connection_pool.get_connection('_')
 
-        return KombuRedis
+        return AsyncRedis
 
     @contextmanager
     def conn_or_acquire(self, client=None):
         if client:
             yield client
         else:
-            if self._in_poll:
-                client = self._create_client()
-                try:
-                    yield client
-                finally:
-                    self.pool.release(client.connection)
-            else:
-                yield self.client
+            yield self._create_client()
 
     @property
     def pool(self):
@@ -856,15 +877,21 @@
             self._pool = self._get_pool()
         return self._pool
 
+    @property
+    def async_pool(self):
+        if self._async_pool is None:
+            self._async_pool = self._get_pool(async=True)
+        return self._async_pool
+
     @cached_property
     def client(self):
         """Client used to publish messages, BRPOP etc."""
-        return self._create_client()
+        return self._create_client(async=True)
 
     @cached_property
     def subclient(self):
         """Pub/Sub connection used to consume fanout queues."""
-        client = self._create_client()
+        client = self._create_client(async=True)
         pubsub = client.pubsub()
         pool = pubsub.connection_pool
         pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu/utils/__init__.py 
new/kombu-3.0.33/kombu/utils/__init__.py
--- old/kombu-3.0.29/kombu/utils/__init__.py    2015-10-26 18:43:47.000000000 
+0100
+++ new/kombu-3.0.33/kombu/utils/__init__.py    2016-01-08 23:35:32.000000000 
+0100
@@ -16,7 +16,11 @@
 from itertools import count, repeat
 from functools import wraps
 from time import sleep
-from uuid import UUID, uuid4 as _uuid4, _uuid_generate_random
+from uuid import UUID, uuid4
+try:
+    from uuid import _uuid_generate_random
+except ImportError:
+    _uuid_generate_random = None
 
 from kombu.five import items, reraise, string_t
 
@@ -140,13 +144,12 @@
     print(str(m).format(*fargs, **fkwargs), file=sys.stderr)
 
 
-def uuid4():
-    # Workaround for http://bugs.python.org/issue4607
-    if ctypes and _uuid_generate_random:  # pragma: no cover
+if ctypes and _uuid_generate_random:  # pragma: no cover
+    def uuid4():
+        # Workaround for http://bugs.python.org/issue4607
         buffer = ctypes.create_string_buffer(16)
         _uuid_generate_random(buffer)
         return UUID(bytes=buffer.raw)
-    return _uuid4()
 
 
 def uuid():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu.egg-info/PKG-INFO 
new/kombu-3.0.33/kombu.egg-info/PKG-INFO
--- old/kombu-3.0.29/kombu.egg-info/PKG-INFO    2015-10-26 19:13:11.000000000 
+0100
+++ new/kombu-3.0.33/kombu.egg-info/PKG-INFO    2016-01-09 03:38:05.000000000 
+0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.29
+Version: 3.0.33
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.29
+        :Version: 3.0.33
         
         `Kombu` is a messaging library for Python.
         
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu.egg-info/SOURCES.txt 
new/kombu-3.0.33/kombu.egg-info/SOURCES.txt
--- old/kombu-3.0.29/kombu.egg-info/SOURCES.txt 2015-10-26 19:13:11.000000000 
+0100
+++ new/kombu-3.0.33/kombu.egg-info/SOURCES.txt 2016-01-09 03:38:05.000000000 
+0100
@@ -107,6 +107,8 @@
 examples/simple_task_queue/tasks.py
 examples/simple_task_queue/worker.py
 extra/doc2ghpages
+extra/appveyor/install.ps1
+extra/appveyor/run_with_compiler.cmd
 extra/release/bump_version.py
 extra/release/doc4allmods
 extra/release/flakeplus.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/kombu.egg-info/requires.txt 
new/kombu-3.0.33/kombu.egg-info/requires.txt
--- old/kombu-3.0.29/kombu.egg-info/requires.txt        2015-10-26 
19:13:11.000000000 +0100
+++ new/kombu-3.0.33/kombu.egg-info/requires.txt        2016-01-09 
03:38:05.000000000 +0100
@@ -1,5 +1,5 @@
 anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0
 
 [sqlalchemy]
 sqlalchemy
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-3.0.29/requirements/default.txt 
new/kombu-3.0.33/requirements/default.txt
--- old/kombu-3.0.29/requirements/default.txt   2015-10-26 18:43:47.000000000 
+0100
+++ new/kombu-3.0.33/requirements/default.txt   2016-01-09 03:06:52.000000000 
+0100
@@ -1,2 +1,2 @@
 anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0


Reply via email to