Script 'mail_helper' called by obssrc
Hello community,
here is the log from the commit of package python-stomp.py for openSUSE:Factory
checked in at 2026-05-20 15:24:55
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-stomp.py (Old)
and /work/SRC/openSUSE:Factory/.python-stomp.py.new.1966 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-stomp.py"
Wed May 20 15:24:55 2026 rev:16 rq:1354125 version:9.0.0
Changes:
--------
--- /work/SRC/openSUSE:Factory/python-stomp.py/python-stomp.py.changes
2026-02-12 17:28:21.053794874 +0100
+++
/work/SRC/openSUSE:Factory/.python-stomp.py.new.1966/python-stomp.py.changes
2026-05-20 15:26:07.554106564 +0200
@@ -1,0 +2,11 @@
+Tue May 19 21:59:43 UTC 2026 - Dirk Müller <[email protected]>
+
+- update to 9.0.0:
+ * Update to docopt-ng
+ * Pass ssl version from ssl_params
+ * Use single thread for IO when using TLS
+ * Added SSL error message from cert validation to log warning
+ * Rename stomp.logging to stomp.log, since python doesn't "like"
+ overloading the logging module
+
+-------------------------------------------------------------------
Old:
----
stomp_py-8.2.0.tar.gz
New:
----
stomp_py-9.0.0.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-stomp.py.spec ++++++
--- /var/tmp/diff_new_pack.IiTVD9/_old 2026-05-20 15:26:09.142172000 +0200
+++ /var/tmp/diff_new_pack.IiTVD9/_new 2026-05-20 15:26:09.142172000 +0200
@@ -18,7 +18,7 @@
%{?sle15_python_module_pythons}
Name: python-stomp.py
-Version: 8.2.0
+Version: 9.0.0
Release: 0
Summary: Python STOMP client
License: Apache-2.0
++++++ stomp_py-8.2.0.tar.gz -> stomp_py-9.0.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/PKG-INFO new/stomp_py-9.0.0/PKG-INFO
--- old/stomp_py-8.2.0/PKG-INFO 1970-01-01 01:00:00.000000000 +0100
+++ new/stomp_py-9.0.0/PKG-INFO 1970-01-01 01:00:00.000000000 +0100
@@ -1,28 +1,26 @@
-Metadata-Version: 2.1
+Metadata-Version: 2.4
Name: stomp.py
-Version: 8.2.0
+Version: 9.0.0
Summary: Python STOMP client, supporting versions 1.0, 1.1 and 1.2 of the
protocol
-Home-page: https://github.com/jasonrbriggs/stomp.py
License: Apache-2.0
+License-File: LICENSE
Keywords: stomp,messaging,events,client
Author: Jason R Briggs
Author-email: [email protected]
-Requires-Python: >=3.7,<4.0
+Requires-Python: >=3.10,<4.0
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.7
-Classifier: Programming Language :: Python :: 3.8
-Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
-Requires-Dist: docopt (>=0.6.2,<0.7.0)
+Classifier: Programming Language :: Python :: 3.14
+Requires-Dist: docopt-ng (>=0.9.0,<0.10.0)
Requires-Dist: websocket-client (>=1.2.3,<2.0.0)
-Project-URL: Documentation, http://jasonrbriggs.github.io/stomp.py/
-Project-URL: Repository, https://github.com/jasonrbriggs/stomp.py
+Project-URL: Documentation, https://jasonrbriggs.codeberg.page/stomp.py.pages/
+Project-URL: Repository, https://codeberg.org/jasonrbriggs/stomp.py
Description-Content-Type: text/x-rst
========
@@ -33,7 +31,7 @@
:target: https://badge.fury.io/py/stomp.py
:alt: PyPI version
-"stomp.py" is a Python client library for accessing messaging servers (such as
ActiveMQ_, Artemis_ or RabbitMQ_) using the STOMP_ protocol (`STOMP v1.0`_,
`STOMP v1.1`_ and `STOMP v1.2`_). It can also be run as a standalone,
command-line client for testing. NOTE: Stomp.py has officially ended support
for Python2.x. See `python3statement.org`_ for more information.
+"stomp.py" is a Python client library for accessing messaging servers (such as
`ActiveMQ Classic`_, `ActiveMQ Artemis`_ or RabbitMQ_) using the STOMP_
protocol (`STOMP v1.0`_, `STOMP v1.1`_ and `STOMP v1.2`_). It can also be run
as a standalone, command-line client for testing. NOTE: Stomp.py has
officially ended support for Python2.x. See `python3statement.org`_ for more
information.
.. contents:: \
:depth: 1
@@ -61,8 +59,6 @@
- `API documentation`_ (see `stomp.github.io`_ for details on the STOMP
protocol itself)
- A basic example of using stomp.py with a message listener can be found in
the `quick start`_ section of the main documentation
- Description of the `command-line interface`_
-- `Travis`_ for continuous integration builds
-- Current `test coverage report`_
- `PyPi stomp.py page`_
The current version of stomp.py supports:
@@ -85,10 +81,10 @@
stomp.py has been perfunctorily tested on:
-- Pivotal `RabbitMQ`_ (`test_rabbitmq.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_rabbitmq.py>`_)
-- Apache `ActiveMQ`_ (`test_activemq.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_activemq.py>`_)
-- Apache ActiveMQ `Artemis`_ (`test_artemis.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_artemis.py>`_)
-- `stompserver`_ (`test_stompserver.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_stompserver.py>`_)
+- Pivotal `RabbitMQ`_ (`test_rabbitmq.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_rabbitmq.py>`_)
+- Apache `ActiveMQ Classic`_ (`test_activemq.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_activemq.py>`_)
+- Apache `ActiveMQ Artemis`_ (`test_artemis.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_artemis.py>`_)
+- `stompserver`_ (`test_stompserver.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_stompserver.py>`_)
For testing locally, you'll need to install docker (or `podman`_). Once
installed:
@@ -121,20 +117,18 @@
.. _`STOMP v1.2`: http://stomp.github.io/stomp-specification-1.2.html
.. _`python3statement.org`: http://python3statement.org/
-.. _`Main documentation`: http://jasonrbriggs.github.io/stomp.py/index.html
+.. _`Main documentation`: https://jasonrbriggs.codeberg.page/stomp.py.pages/
.. _`stomp.github.io`: http://stomp.github.io/
-.. _`quick start`: http://jasonrbriggs.github.io/stomp.py/quickstart.html
-.. _`command-line interface`:
http://jasonrbriggs.github.io/stomp.py/commandline.html
+.. _`quick start`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/quickstart.html
+.. _`command-line interface`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/commandline.html
.. _`PyPi stomp.py page`: https://pypi.org/project/stomp.py/
-.. _`API documentation`: http://jasonrbriggs.github.io/stomp.py/api.html
-.. _`test coverage report`: http://jasonrbriggs.github.io/stomp.py/htmlcov/
-.. _`Travis`: https://travis-ci.org/jasonrbriggs/stomp.py
+.. _`API documentation`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/api.html
.. _`3.1.7 on PyPi`: https://pypi.org/project/stomp.py/3.1.7/
.. _`3.1.7 on GitHub`:
https://github.com/jasonrbriggs/stomp.py/tree/stomppy-3series
-.. _`ActiveMQ`: http://activemq.apache.org/
-.. _`Artemis`: https://activemq.apache.org/components/artemis/
+.. _`ActiveMQ Classic`: http://activemq.apache.org/components/classic
+.. _`ActiveMQ Artemis`: https://activemq.apache.org/components/artemis/
.. _`RabbitMQ`: http://www.rabbitmq.com
.. _`stompserver`: http://stompserver.rubyforge.org
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/README.rst
new/stomp_py-9.0.0/README.rst
--- old/stomp_py-8.2.0/README.rst 2022-08-01 09:39:51.672854400 +0200
+++ new/stomp_py-9.0.0/README.rst 1970-01-01 01:00:00.000000000 +0100
@@ -6,7 +6,7 @@
:target: https://badge.fury.io/py/stomp.py
:alt: PyPI version
-"stomp.py" is a Python client library for accessing messaging servers (such as
ActiveMQ_, Artemis_ or RabbitMQ_) using the STOMP_ protocol (`STOMP v1.0`_,
`STOMP v1.1`_ and `STOMP v1.2`_). It can also be run as a standalone,
command-line client for testing. NOTE: Stomp.py has officially ended support
for Python2.x. See `python3statement.org`_ for more information.
+"stomp.py" is a Python client library for accessing messaging servers (such as
`ActiveMQ Classic`_, `ActiveMQ Artemis`_ or RabbitMQ_) using the STOMP_
protocol (`STOMP v1.0`_, `STOMP v1.1`_ and `STOMP v1.2`_). It can also be run
as a standalone, command-line client for testing. NOTE: Stomp.py has
officially ended support for Python2.x. See `python3statement.org`_ for more
information.
.. contents:: \
:depth: 1
@@ -34,8 +34,6 @@
- `API documentation`_ (see `stomp.github.io`_ for details on the STOMP
protocol itself)
- A basic example of using stomp.py with a message listener can be found in
the `quick start`_ section of the main documentation
- Description of the `command-line interface`_
-- `Travis`_ for continuous integration builds
-- Current `test coverage report`_
- `PyPi stomp.py page`_
The current version of stomp.py supports:
@@ -58,10 +56,10 @@
stomp.py has been perfunctorily tested on:
-- Pivotal `RabbitMQ`_ (`test_rabbitmq.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_rabbitmq.py>`_)
-- Apache `ActiveMQ`_ (`test_activemq.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_activemq.py>`_)
-- Apache ActiveMQ `Artemis`_ (`test_artemis.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_artemis.py>`_)
-- `stompserver`_ (`test_stompserver.py
<https://github.com/jasonrbriggs/stomp.py/blob/dev/tests/test_stompserver.py>`_)
+- Pivotal `RabbitMQ`_ (`test_rabbitmq.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_rabbitmq.py>`_)
+- Apache `ActiveMQ Classic`_ (`test_activemq.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_activemq.py>`_)
+- Apache `ActiveMQ Artemis`_ (`test_artemis.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_artemis.py>`_)
+- `stompserver`_ (`test_stompserver.py
<https://codeberg.org/jasonrbriggs/stomp.py/src/branch/dev/tests/test_stompserver.py>`_)
For testing locally, you'll need to install docker (or `podman`_). Once
installed:
@@ -94,20 +92,18 @@
.. _`STOMP v1.2`: http://stomp.github.io/stomp-specification-1.2.html
.. _`python3statement.org`: http://python3statement.org/
-.. _`Main documentation`: http://jasonrbriggs.github.io/stomp.py/index.html
+.. _`Main documentation`: https://jasonrbriggs.codeberg.page/stomp.py.pages/
.. _`stomp.github.io`: http://stomp.github.io/
-.. _`quick start`: http://jasonrbriggs.github.io/stomp.py/quickstart.html
-.. _`command-line interface`:
http://jasonrbriggs.github.io/stomp.py/commandline.html
+.. _`quick start`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/quickstart.html
+.. _`command-line interface`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/commandline.html
.. _`PyPi stomp.py page`: https://pypi.org/project/stomp.py/
-.. _`API documentation`: http://jasonrbriggs.github.io/stomp.py/api.html
-.. _`test coverage report`: http://jasonrbriggs.github.io/stomp.py/htmlcov/
-.. _`Travis`: https://travis-ci.org/jasonrbriggs/stomp.py
+.. _`API documentation`:
https://jasonrbriggs.codeberg.page/stomp.py.pages/api.html
.. _`3.1.7 on PyPi`: https://pypi.org/project/stomp.py/3.1.7/
.. _`3.1.7 on GitHub`:
https://github.com/jasonrbriggs/stomp.py/tree/stomppy-3series
-.. _`ActiveMQ`: http://activemq.apache.org/
-.. _`Artemis`: https://activemq.apache.org/components/artemis/
+.. _`ActiveMQ Classic`: http://activemq.apache.org/components/classic
+.. _`ActiveMQ Artemis`: https://activemq.apache.org/components/artemis/
.. _`RabbitMQ`: http://www.rabbitmq.com
.. _`stompserver`: http://stompserver.rubyforge.org
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/pyproject.toml
new/stomp_py-9.0.0/pyproject.toml
--- old/stomp_py-8.2.0/pyproject.toml 2024-04-13 21:00:48.014782700 +0200
+++ new/stomp_py-9.0.0/pyproject.toml 1970-01-01 01:00:00.000000000 +0100
@@ -1,12 +1,12 @@
[tool.poetry]
name = "stomp.py"
-version = "8.2.0"
+version = "9.0.0"
description = "Python STOMP client, supporting versions 1.0, 1.1 and 1.2 of
the protocol"
authors = ["Jason R Briggs <[email protected]>"]
license = "Apache-2.0"
readme = "README.rst"
-repository = "https://github.com/jasonrbriggs/stomp.py"
-documentation = "http://jasonrbriggs.github.io/stomp.py/"
+repository = "https://codeberg.org/jasonrbriggs/stomp.py"
+documentation = "https://jasonrbriggs.codeberg.page/stomp.py.pages/"
keywords = ["stomp", "messaging", "events", "client"]
classifiers = [
"Development Status :: 5 - Production/Stable",
@@ -19,15 +19,15 @@
]
[tool.poetry.dependencies]
-python = "^3.7"
-docopt = "^0.6.2"
+python = "^3.10"
+docopt-ng = "^0.9.0"
websocket-client = "^1.2.3"
-[tool.poetry.dev-dependencies]
-pytest = ">=7.2.0"
-pytest-cov = ">=4.0.0"
-pytest-mock = ">=3.10.0"
-pytest-html = ">=3.2.0"
+[tool.poetry.group.dev.dependencies]
+pytest = "^9.0.3"
+pytest-cov = ">=6.0.0"
+pytest-mock = ">=3.14.0"
+pytest-html = ">=4.1.1"
pytest-ordering = ">=0.6"
[tool.poetry.scripts]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/__init__.py
new/stomp_py-9.0.0/stomp/__init__.py
--- old/stomp_py-8.2.0/stomp/__init__.py 2024-04-13 21:20:55.169288600
+0200
+++ new/stomp_py-9.0.0/stomp/__init__.py 1970-01-01 01:00:00.000000000
+0100
@@ -12,9 +12,9 @@
import stomp.adapter as adapter
import stomp.connect as connect
import stomp.listener as listener
-import stomp.logging as logging
+import stomp.log as log
-__version__ = "8.2.0"
+__version__ = "9.0.0"
##
# Alias for STOMP 1.0 connections.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/__main__.py
new/stomp_py-9.0.0/stomp/__main__.py
--- old/stomp_py-8.2.0/stomp/__main__.py 2023-03-22 22:16:43.493015800
+0100
+++ new/stomp_py-9.0.0/stomp/__main__.py 1970-01-01 01:00:00.000000000
+0100
@@ -25,6 +25,7 @@
--ssl-key-file=<key-file> ssl key file
--ssl-cert-file=<cert-file> ssl cert file
--ssl-ca-file=<ca-file> ssl ca certs file
+ --ssl-insecure disable cert validation
"""
@@ -67,7 +68,7 @@
for more information on establishing a connection to a stomp server.
"""
def __init__(self, host="localhost", port=61613, user="", passcode="",
ver="1.1", prompt="> ", verbose=True,
- heartbeats=(0, 0), use_ssl=False, ssl_key_file=None,
ssl_cert_file=None, ssl_ca_file=None,
+ heartbeats=(0, 0), use_ssl=False, ssl_key_file=None,
ssl_cert_file=None, ssl_ca_file=None, ssl_insecure=False,
stdin=sys.stdin, stdout=sys.stdout):
Cmd.__init__(self, "Tab", stdin, stdout)
ConnectionListener.__init__(self)
@@ -88,7 +89,7 @@
else:
raise RuntimeError("Unknown version")
if use_ssl:
- self.conn.set_ssl([(host, port)], key_file=ssl_key_file,
cert_file=ssl_cert_file, ca_certs=ssl_ca_file)
+ self.conn.set_ssl([(host, port)], key_file=ssl_key_file,
cert_file=ssl_cert_file, ca_certs=ssl_ca_file, verify=not ssl_insecure)
self.conn.set_listener("", self)
self.conn.connect(self.user, self.passcode, wait=True)
self.transaction_id = None
@@ -368,7 +369,7 @@
def do_version(self, args):
self.__sysout("%s%s [Protocol version %s]%s" %
- (stomp.colours.BOLD, stomp.version, self.conn.version,
stomp.colours.NO_COLOUR))
+ (stomp.colours.BOLD, stomp.__version__,
self.conn.version, stomp.colours.NO_COLOUR))
do_ver = do_version
def help_version(self):
@@ -520,7 +521,8 @@
use_ssl=arguments["--ssl"],
ssl_key_file=arguments["--ssl-key-file"],
ssl_cert_file=arguments["--ssl-cert-file"],
- ssl_ca_file=arguments["--ssl-ca-file"])
+ ssl_ca_file=arguments["--ssl-ca-file"],
+ ssl_insecure=arguments["--ssl-insecure"])
if arguments["--listen"] is not None:
st.do_subscribe(arguments["--listen"])
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/adapter/multicast.py
new/stomp_py-9.0.0/stomp/adapter/multicast.py
--- old/stomp_py-8.2.0/stomp/adapter/multicast.py 2022-11-13
17:44:23.491555700 +0100
+++ new/stomp_py-9.0.0/stomp/adapter/multicast.py 1970-01-01
01:00:00.000000000 +0100
@@ -81,7 +81,7 @@
receipt_frame = Frame("RECEIPT", {"receipt-id":
f.headers["receipt"]})
lines = convert_frame(receipt_frame)
self.send(encode(pack(lines)))
- logging.debug("received frame: %r, headers=%r, body=%r", f.cmd,
f.headers, f.body)
+ log.debug("received frame: %r, headers=%r, body=%r", f.cmd, f.headers,
f.body)
def stop(self):
self.running = False
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/adapter/ws.py
new/stomp_py-9.0.0/stomp/adapter/ws.py
--- old/stomp_py-8.2.0/stomp/adapter/ws.py 2024-04-12 23:20:52.221595800
+0200
+++ new/stomp_py-9.0.0/stomp/adapter/ws.py 1970-01-01 01:00:00.000000000
+0100
@@ -33,7 +33,7 @@
from stomp.connect import BaseConnection, StompConnection12
from stomp.protocol import Protocol12
from stomp.exception import *
-from stomp import logging
+from stomp import log
class WSTransport(BaseTransport):
@@ -101,7 +101,7 @@
BaseTransport.__init__(self, auto_decode, encoding, is_eol_fc)
if host_and_ports is None:
- logging.debug("no hosts_and_ports specified, adding default
localhost")
+ log.debug("no hosts_and_ports specified, adding default localhost")
host_and_ports = [("localhost", 61613)]
sorted_host_and_ports = []
@@ -179,7 +179,7 @@
_, e, _ = sys.exc_info()
# ignore when socket already closed
if get_errno(e) != errno.ENOTCONN:
- logging.warning("Unable to issue SHUT_RDWR on socket
because of error '%s'", e)
+ log.warning("Unable to issue SHUT_RDWR on socket because
of error '%s'", e)
#
# split this into a separate check, because sometimes the socket is
nulled between shutdown and this call
@@ -189,7 +189,7 @@
self.socket.close()
except socket.error:
_, e, _ = sys.exc_info()
- logging.warning("unable to close socket because of error
'%s'", e)
+ log.warning("unable to close socket because of error '%s'", e)
self.current_host_and_port = None
self.socket = None
if not self.notified_on_disconnect:
@@ -210,7 +210,7 @@
self.socket.send(encoded_frame, opcode)
except Exception:
_, e, _ = sys.exc_info()
- logging.error("error sending frame", exc_info=True)
+ log.error("error sending frame", exc_info=True)
raise e
else:
raise NotConnectedException()
@@ -225,7 +225,7 @@
except socket.error:
_, e, _ = sys.exc_info()
if get_errno(e) in (errno.EAGAIN, errno.EINTR):
- logging.debug("socket read interrupted, restarting")
+ log.debug("socket read interrupted, restarting")
raise InterruptedException()
if self.is_connected():
raise
@@ -246,9 +246,9 @@
return True # no value to set always works
try:
sock.setsockopt(fam, opt, val)
- logging.info("keepalive: set %r option to %r on socket", name,
val)
+ log.info("keepalive: set %r option to %r on socket", name, val)
except:
- logging.error("keepalive: unable to set %r option to %r on
socket", name, val)
+ log.error("keepalive: unable to set %r option to %r on
socket", name, val)
return False
return True
@@ -265,26 +265,26 @@
ka_sig = ka[0]
ka_args = ka[1:]
except Exception:
- logging.error("keepalive: bad specification %r", ka)
+ log.error("keepalive: bad specification %r", ka)
return
if ka_sig == "auto":
if LINUX_KEEPALIVE_AVAIL:
ka_sig = "linux"
ka_args = None
- logging.info("keepalive: autodetected linux-style support")
+ log.info("keepalive: autodetected linux-style support")
elif MAC_KEEPALIVE_AVAIL:
ka_sig = "mac"
ka_args = None
- logging.info("keepalive: autodetected mac-style support")
+ log.info("keepalive: autodetected mac-style support")
else:
- logging.error("keepalive: unable to detect any implementation,
DISABLED!")
+ log.error("keepalive: unable to detect any implementation,
DISABLED!")
return
if ka_sig == "linux":
- logging.info("keepalive: activating linux-style support")
+ log.info("keepalive: activating linux-style support")
if ka_args is None:
- logging.info("keepalive: using system defaults")
+ log.info("keepalive: using system defaults")
ka_args = (None, None, None)
ka_idle, ka_intvl, ka_cnt = ka_args
if try_setsockopt(self.socket, "enable", SOL_SOCKET, SO_KEEPALIVE,
1):
@@ -292,15 +292,15 @@
try_setsockopt(self.socket, "interval", SOL_TCP,
TCP_KEEPINTVL, ka_intvl)
try_setsockopt(self.socket, "count", SOL_TCP, TCP_KEEPCNT,
ka_cnt)
elif ka_sig == "mac":
- logging.info("keepalive: activating mac-style support")
+ log.info("keepalive: activating mac-style support")
if ka_args is None:
- logging.info("keepalive: using system defaults")
+ log.info("keepalive: using system defaults")
ka_args = (3,)
ka_intvl = ka_args
if try_setsockopt(self.socket, "enable", SOL_SOCKET, SO_KEEPALIVE,
1):
try_setsockopt(self.socket, socket.IPPROTO_TCP, 0x10, ka_intvl)
else:
- logging.error("keepalive: implementation %r not recognized or not
supported", ka_sig)
+ log.error("keepalive: implementation %r not recognized or not
supported", ka_sig)
def attempt_connection(self):
"""
@@ -310,12 +310,12 @@
sleep_exp = 1
connect_count = 0
- logging.info("attempt reconnection (%s, %s, %s)", self.running,
self.socket, connect_count)
+ log.info("attempt reconnection (%s, %s, %s)", self.running,
self.socket, connect_count)
while self.running and self.socket is None and (connect_count <
self.__reconnect_attempts_max or
self.__reconnect_attempts_max == -1):
for host_and_port in self.__host_and_ports:
try:
- logging.info("attempting connection to host %s, port %s",
host_and_port[0], host_and_port[1])
+ log.info("attempting connection to host %s, port %s",
host_and_port[0], host_and_port[1])
#websocket.enableTrace(True)
self.current_host_and_port = host_and_port
path = "/"
@@ -336,13 +336,13 @@
header=self.header,
sslopt=self.get_ssl()
)
- logging.info("established connection to host %s, port %s",
host_and_port[0], host_and_port[1])
+ log.info("established connection to host %s, port %s",
host_and_port[0], host_and_port[1])
break
except (OSError, AssertionError,
websocket._exceptions.WebSocketException) as exc:
self.socket = None
connect_count += 1
- logging.warning("Could not connect to host %s, port %s:
%s", host_and_port[0], host_and_port[1],
- str(exc), exc_info=logging.verbose)
+ log.warning("Could not connect to host %s, port %s: %s",
host_and_port[0], host_and_port[1],
+ str(exc), exc_info=log.verbose)
if self.socket is None:
sleep_duration = (min(self.__reconnect_sleep_max,
@@ -350,7 +350,7 @@
* math.pow(1.0 +
self.__reconnect_sleep_increase, sleep_exp)))
* (1.0 + random.random() *
self.__reconnect_sleep_jitter))
sleep_end = monotonic() + sleep_duration
- logging.debug("sleeping for %.1f seconds before attempting
reconnect", sleep_duration)
+ log.debug("sleeping for %.1f seconds before attempting
reconnect", sleep_duration)
while self.running and monotonic() < sleep_end:
time.sleep(0.2)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/connect.py
new/stomp_py-9.0.0/stomp/connect.py
--- old/stomp_py-8.2.0/stomp/connect.py 2022-11-13 17:44:23.503555800 +0100
+++ new/stomp_py-9.0.0/stomp/connect.py 1970-01-01 01:00:00.000000000 +0100
@@ -105,6 +105,7 @@
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Call the protocol disconnection, and then stop the transport itself.
+ Provide a receipt in order to drain the send queue before returning.
:param str receipt: the receipt to use with the disconnect
:param dict headers: a map of any additional headers to send with the
disconnection
@@ -153,6 +154,7 @@
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Call the protocol disconnection, and then stop the transport itself.
+ Provide a receipt in order to drain the send queue before returning.
:param str receipt: the receipt to use with the disconnect
:param dict headers: a map of any additional headers to send with the
disconnection
@@ -201,6 +203,7 @@
def disconnect(self, receipt=None, headers=None, **keyword_headers):
"""
Call the protocol disconnection, and then stop the transport itself.
+ Provide a receipt in order to drain the send queue before returning.
:param str receipt: the receipt to use with the disconnect
:param dict headers: a map of any additional headers to send with the
disconnection
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/listener.py
new/stomp_py-9.0.0/stomp/listener.py
--- old/stomp_py-8.2.0/stomp/listener.py 2024-10-31 22:58:23.776828800
+0100
+++ new/stomp_py-9.0.0/stomp/listener.py 1970-01-01 01:00:00.000000000
+0100
@@ -9,7 +9,7 @@
import stomp.exception as exception
import stomp.utils as utils
from stomp.constants import *
-from stomp import logging
+from stomp import log
class Publisher(object):
@@ -178,7 +178,7 @@
if "heart-beat" in frame.headers:
self.heartbeats = utils.calculate_heartbeats(
frame.headers["heart-beat"].replace(" ", "").split(","),
self.heartbeats)
- logging.debug("heartbeats calculated %s", str(self.heartbeats))
+ log.debug("heartbeats calculated %s", str(self.heartbeats))
if self.heartbeats != (0, 0):
self.send_sleep = self.heartbeats[0] / 1000
@@ -186,7 +186,7 @@
# set a different heart-beat-receive-scale when creating the
connection to override that
self.receive_sleep = (self.heartbeats[1] / 1000) *
self.heart_beat_receive_scale
- logging.debug("set receive_sleep to %s, send_sleep to %s",
self.receive_sleep, self.send_sleep)
+ log.debug("set receive_sleep to %s, send_sleep to %s",
self.receive_sleep, self.send_sleep)
# Give grace of receiving the first heartbeat
self.received_heartbeat = monotonic() + self.receive_sleep
@@ -257,13 +257,13 @@
"""
Main loop for sending (and monitoring received) heartbeats.
"""
- logging.debug("starting heartbeat loop")
+ log.debug("starting heartbeat loop")
now = monotonic()
# Setup the initial due time for the outbound heartbeat
if self.send_sleep != 0:
self.next_outbound_heartbeat = now + self.send_sleep
- logging.debug("calculated next outbound heartbeat as %s",
self.next_outbound_heartbeat)
+ log.debug("calculated next outbound heartbeat as %s",
self.next_outbound_heartbeat)
while not self.heartbeat_terminate_event.is_set():
now = monotonic()
@@ -288,21 +288,21 @@
continue
if self.send_sleep != 0 and now > self.next_outbound_heartbeat:
- logging.debug("sending a heartbeat message at %s", now)
+ log.debug("sending a heartbeat message at %s", now)
try:
self.transport.transmit(utils.Frame(None, {}, None))
except exception.NotConnectedException:
- logging.debug("lost connection, unable to send heartbeat")
+ log.debug("lost connection, unable to send heartbeat")
except Exception:
_, e, _ = sys.exc_info()
- logging.debug("unable to send heartbeat, due to: %s", e)
+ log.debug("unable to send heartbeat, due to: %s", e)
if self.receive_sleep != 0:
diff_receive = now - self.received_heartbeat
if diff_receive > self.receive_sleep:
# heartbeat timeout
- logging.warning("heartbeat timeout: diff_receive=%s,
time=%s, lastrec=%s",
+ log.warning("heartbeat timeout: diff_receive=%s, time=%s,
lastrec=%s",
diff_receive, now, self.received_heartbeat)
self.transport.set_connected(False)
self.transport.disconnect_socket()
@@ -312,7 +312,7 @@
self.heartbeat_thread = None
if self.heartbeats != (0, 0):
# don't bother logging this if heartbeats weren't setup to start
with
- logging.debug("heartbeat loop ended")
+ log.debug("heartbeat loop ended")
class WaitingListener(ConnectionListener):
@@ -388,7 +388,7 @@
Increment the disconnect count. See
:py:meth:`ConnectionListener.on_disconnected`
"""
self.disconnects += 1
- logging.info("disconnected (x %s)", self.disconnects)
+ log.info("disconnected (x %s)", self.disconnects)
def on_error(self, frame):
"""
@@ -396,10 +396,10 @@
:param Frame frame: the stomp frame
"""
- if logging.isEnabledFor(logging.DEBUG):
- logging.debug("received an error %s [%s]", frame.body,
frame.headers)
+ if log.isEnabledFor(log.DEBUG):
+ log.debug("received an error %s [%s]", frame.body, frame.headers)
else:
- logging.info("received an error %s", frame.body)
+ log.info("received an error %s", frame.body)
self.errors += 1
def on_connecting(self, host_and_port):
@@ -408,7 +408,7 @@
:param (str,int) host_and_port: the host and port as a tuple
"""
- logging.info("connecting %s %s (x %s)", host_and_port[0],
host_and_port[1], self.connections)
+ log.info("connecting %s %s (x %s)", host_and_port[0],
host_and_port[1], self.connections)
self.connections += 1
def on_message(self, frame):
@@ -431,7 +431,7 @@
"""
Increment the heartbeat timeout. See
:py:meth:`ConnectionListener.on_heartbeat_timeout`
"""
- logging.debug("received heartbeat timeout")
+ log.debug("received heartbeat timeout")
self.heartbeat_timeouts += 1
def on_heartbeat(self):
@@ -460,7 +460,7 @@
def __print(self, msg, *args):
if self.print_to_log:
- logging.info(msg, *args)
+ log.info(msg, *args)
else:
print(msg % args)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/log.py
new/stomp_py-9.0.0/stomp/log.py
--- old/stomp_py-8.2.0/stomp/log.py 1970-01-01 01:00:00.000000000 +0100
+++ new/stomp_py-9.0.0/stomp/log.py 1970-01-01 01:00:00.000000000 +0100
@@ -0,0 +1,26 @@
+import logging
+
+DEBUG = logging.DEBUG
+INFO = logging.INFO
+
+verbose = False
+
+__logger = logging.getLogger("stomp.py")
+debug = __logger.debug
+info = __logger.info
+warning = __logger.warning
+error = __logger.error
+isEnabledFor = __logger.isEnabledFor
+setLevel = __logger.setLevel
+
+
+def log_to_stdout(verbose_logging=True):
+ import sys
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setLevel(logging.DEBUG)
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -
%(message)s')
+ handler.setFormatter(formatter)
+ __logger.addHandler(handler)
+ __logger.setLevel(logging.DEBUG)
+ global verbose
+ verbose = verbose_logging
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/logging.py
new/stomp_py-9.0.0/stomp/logging.py
--- old/stomp_py-8.2.0/stomp/logging.py 2022-10-22 20:46:40.395770300 +0200
+++ new/stomp_py-9.0.0/stomp/logging.py 1970-01-01 01:00:00.000000000 +0100
@@ -1,26 +0,0 @@
-import logging
-
-DEBUG = logging.DEBUG
-INFO = logging.INFO
-
-verbose = False
-
-__logger = logging.getLogger("stomp.py")
-debug = __logger.debug
-info = __logger.info
-warning = __logger.warning
-error = __logger.error
-isEnabledFor = __logger.isEnabledFor
-setLevel = __logger.setLevel
-
-
-def log_to_stdout(verbose_logging=True):
- import sys
- handler = logging.StreamHandler(sys.stdout)
- handler.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -
%(message)s')
- handler.setFormatter(formatter)
- __logger.addHandler(handler)
- __logger.setLevel(logging.DEBUG)
- global verbose
- verbose = verbose_logging
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/protocol.py
new/stomp_py-9.0.0/stomp/protocol.py
--- old/stomp_py-8.2.0/stomp/protocol.py 2023-06-29 23:52:07.284372300
+0200
+++ new/stomp_py-9.0.0/stomp/protocol.py 1970-01-01 01:00:00.000000000
+0100
@@ -132,7 +132,7 @@
:param keyword_headers: any additional headers the broker requires
"""
if not self.transport.is_connected():
- logging.debug("not sending disconnect, already disconnected")
+ log.debug("not sending disconnect, already disconnected")
return
headers = utils.merge_headers([headers, keyword_headers])
rec = receipt or utils.get_uuid()
@@ -351,7 +351,7 @@
:param keyword_headers: any additional headers the broker requires
"""
if not self.transport.is_connected():
- logging.debug("not sending disconnect, already disconnected")
+ log.debug("not sending disconnect, already disconnected")
return
headers = utils.merge_headers([headers, keyword_headers])
rec = receipt or utils.get_uuid()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/transport.py
new/stomp_py-9.0.0/stomp/transport.py
--- old/stomp_py-8.2.0/stomp/transport.py 2024-10-21 23:15:32.070741400
+0200
+++ new/stomp_py-9.0.0/stomp/transport.py 1970-01-01 01:00:00.000000000
+0100
@@ -7,8 +7,10 @@
import random
import sys
import time
+import selectors
from io import BytesIO
from time import monotonic
+from collections import deque
try:
from socket import SOL_SOCKET, SO_KEEPALIVE, SOL_TCP, TCP_KEEPIDLE,
TCP_KEEPINTVL, TCP_KEEPCNT
@@ -36,7 +38,7 @@
import stomp.exception as exception
import stomp.listener
from stomp.utils import *
-from stomp import logging
+from stomp import log
class BaseTransport(stomp.listener.Publisher):
@@ -60,7 +62,9 @@
self.__recvbuf = b""
self.listeners = {}
self.running = False
- self.blocking = None
+ self.blocking = True # Flips to False for TLS connections
+ self.send_queue = None
+ self.sel = None
self.connected = False
self.connection_error = False
self.disconnecting = False
@@ -73,11 +77,11 @@
# function for creating threads used by the connection
self.create_thread_fc = default_create_thread
- self.receiver_thread = None
+ self.io_thread = None
self.__listeners_change_condition = threading.Condition()
- self.__receiver_thread_exit_condition = threading.Condition()
- self.__receiver_thread_exited = False
+ self.__io_thread_exit_condition = threading.Condition()
+ self.__io_thread_exited = False
self.__send_wait_condition = threading.Condition()
self.__connect_wait_condition = threading.Condition()
self.__auto_decode = auto_decode
@@ -87,7 +91,7 @@
def override_threading(self, create_thread_fc):
"""
Override for thread creation. Use an alternate threading library by
- setting this to a function with a single argument (which is the
receiver loop callback).
+ setting this to a function with a single argument (which is the io
loop callback).
The thread which is returned should be started (ready to run)
:param function create_thread_fc: single argument function for
creating a thread
@@ -107,18 +111,18 @@
"""
self.running = True
self.attempt_connection()
- self.receiver_thread = self.create_thread_fc(self.__receiver_loop)
- logging.debug("created thread %s using func %s", self.receiver_thread,
self.create_thread_fc)
+ self.io_thread = self.create_thread_fc(self.__io_loop)
+ log.debug("created thread %s using func %s", self.io_thread,
self.create_thread_fc)
self.notify("connecting")
def stop(self):
"""
Stop the connection. Performs a clean shutdown by waiting for the
- receiver thread to exit.
+ io thread to exit.
"""
- with self.__receiver_thread_exit_condition:
- while not self.__receiver_thread_exited and self.is_connected():
- self.__receiver_thread_exit_condition.wait()
+ with self.__io_thread_exit_condition:
+ while not self.__io_thread_exited and self.is_connected():
+ self.__io_thread_exit_condition.wait()
def is_connected(self):
"""
@@ -186,11 +190,11 @@
if frame_type in ["connected", "message", "receipt", "error",
"heartbeat"]:
if frame_type == "message":
self.notify("before_message", f)
- if logging.isEnabledFor(logging.DEBUG):
- logging.debug("received frame: %r, headers=%r, body=%r",
f.cmd, f.headers, f.body)
+ if log.isEnabledFor(log.DEBUG):
+ log.debug("received frame: %r, headers=%r, body=%r", f.cmd,
f.headers, f.body)
self.notify(frame_type, f)
else:
- logging.warning("unknown response frame type: '%s' (frame length
was %d)", frame_type, length(frame_str))
+ log.warning("unknown response frame type: '%s' (frame length was
%d)", frame_type, length(frame_str))
def notify(self, frame_type, frame=None):
"""
@@ -227,7 +231,7 @@
for (_, listener) in listeners:
notify_func = getattr(listener, "on_%s" % frame_type, None)
if not notify_func:
- logging.debug("listener %s has no method on_%s", listener,
frame_type)
+ log.debug("listener %s has no method on_%s", listener,
frame_type)
continue
if frame_type in ("heartbeat", "disconnected"):
notify_func()
@@ -264,9 +268,14 @@
lines = convert_frame(frame)
packed_frame = pack(lines)
- if logging.isEnabledFor(logging.DEBUG):
- logging.debug("sending frame: %s", clean_lines(lines))
- self.send(packed_frame)
+ if log.isEnabledFor(log.DEBUG):
+ log.debug("sending frame: %s", clean_lines(lines))
+
+ if not self.blocking:
+ # Let the io thread write to the socket
+ self.send_queue.put(packed_frame)
+ else:
+ self.send(packed_frame)
def send(self, encoded_frame):
"""
@@ -318,28 +327,62 @@
if not self.running or not self.is_connected():
raise exception.ConnectFailedException()
- def __receiver_loop(self):
+ def __io_loop(self):
"""
- Main loop listening for incoming data.
+ Main loop listening for incoming data. For non-blocking sockets the
+ loop is also responsible for writing to the socket.
"""
- logging.debug("starting receiver loop (%s)",
threading.current_thread())
+ log.debug("starting io loop (%s)", threading.current_thread())
notify_disconnected = True
+ outgoing = deque()
try:
while self.running:
try:
while self.running:
- frames = self.__read()
-
- for frame in frames:
- if self.__is_eol(frame):
- f = HEARTBEAT_FRAME
- else:
- f = parse_frame(frame)
- if f is None:
- continue
- if self.__auto_decode:
- f.body = decode(f.body)
- self.process_frame(f, frame)
+ # Always attempt to read, even for non-blocking
+ # sockets. The TLS socket might have pending incoming
+ # application data even if the underlying TCP socket
+ # has no pending data.
+ self.__read_frames()
+ if not self.blocking:
+ for key, mask in self.sel.select():
+ if key.fileobj == self.socket:
+ if mask & selectors.EVENT_READ:
+ self.__read_frames()
+ if mask & selectors.EVENT_WRITE:
+ if outgoing:
+ # Send the first chunk
+ buf = outgoing[0]
+ try:
+ # Don't use sendall(), may end
+ # up in socket buffer deadlock
+ # scenario if the server is
+ # blocked because the client's
+ # receive buffer is full
+ sent = self.socket.send(buf)
+ except ssl.SSLWantReadError:
+ continue
+ except ssl.SSLWantWriteError:
+ continue
+ if sent < len(buf):
+ # Only part of the chunk was
sent
+ outgoing[0] = buf[sent:]
+ else:
+ # Whole chunk sent
+ outgoing.popleft()
+ if not outgoing:
+ # Nothing more to send, stop
writing
+ self.sel.modify(self.socket,
selectors.EVENT_READ)
+ # Start checking the send queue
again
+ self.sel.modify(self.send_queue,
selectors.EVENT_READ)
+ elif key.fileobj == self.send_queue and mask &
selectors.EVENT_READ:
+ encoded_frame = self.send_queue.get()
+ outgoing.append(encoded_frame)
+ # Now we have data to write to the socket
+ self.sel.modify(self.socket,
selectors.EVENT_READ | selectors.EVENT_WRITE)
+ # Stop checking the send queue in order to
+ # not buffer too many messages
+ self.sel.modify(self.send_queue, 0)
except exception.ConnectionClosedException:
if self.running:
#
@@ -349,13 +392,19 @@
self.running = False
notify_disconnected = True
break
+ except Exception:
+ _, e, _ = sys.exc_info()
+ log.warning(e)
finally:
self.cleanup()
+ except Exception:
+ _, e, _ = sys.exc_info()
+ log.warning(e)
finally:
- with self.__receiver_thread_exit_condition:
- self.__receiver_thread_exited = True
- self.__receiver_thread_exit_condition.notify_all()
- logging.debug("receiver loop ended")
+ with self.__io_thread_exit_condition:
+ self.__io_thread_exited = True
+ self.__io_thread_exit_condition.notify_all()
+ log.debug("io loop ended")
self.notify("receiver_loop_completed")
if notify_disconnected and not self.notified_on_disconnect:
self.notify("disconnected")
@@ -363,6 +412,20 @@
self.__connect_wait_condition.notify_all()
self.notified_on_disconnect = False
+ def __read_frames(self):
+ frames = self.__read()
+
+ for frame in frames:
+ if self.__is_eol(frame):
+ f = HEARTBEAT_FRAME
+ else:
+ f = parse_frame(frame)
+ if f is None:
+ continue
+ if self.__auto_decode:
+ f.body = decode(f.body)
+ self.process_frame(f, frame)
+
def __read(self):
"""
Read the next frame(s) from the socket.
@@ -375,14 +438,22 @@
try:
try:
c = self.receive()
+ except ssl.SSLWantReadError:
+ break
+ except ssl.SSLWantWriteError:
+ break
except exception.InterruptedException:
- logging.debug("socket read interrupted, restarting")
+ log.debug("socket read interrupted, restarting")
continue
except Exception:
- logging.debug("socket read error", exc_info=logging.verbose)
+ log.debug("socket read error", exc_info=log.verbose)
c = b""
if c is None or len(c) == 0:
- logging.debug("nothing received, raising
ConnectionClosedException")
+ if not self.blocking:
+ # recv() for TLS sockets can return None while the
+ # connection is still open.
+ break
+ log.debug("nothing received, raising
ConnectionClosedException")
raise exception.ConnectionClosedException()
if self.__is_eol(c) and not self.__recvbuf and not fastbuf.tell():
#
@@ -502,13 +573,13 @@
vhost=None,
auto_decode=True,
encoding="utf-8",
- recv_bytes=1024,
+ recv_bytes=16384,
is_eol_fc=is_eol_default,
bind_host_port=None):
BaseTransport.__init__(self, auto_decode, encoding, is_eol_fc)
if host_and_ports is None:
- logging.debug("no hosts_and_ports specified, adding default
localhost")
+ log.debug("no hosts_and_ports specified, adding default localhost")
host_and_ports = [("localhost", 61613)]
sorted_host_and_ports = []
@@ -577,7 +648,15 @@
"""
self.running = False
if self.socket is not None:
+ if not self.blocking:
+ self.shutdown_queue()
if self.__need_ssl():
+ # Do the SSL shutdown handshake in blocking mode
+ self.blocking = True
+ try:
+ self.socket.setblocking(self.blocking)
+ except Exception:
+ pass
#
# Even though we don't want to use the socket, unwrap is the
only API method which does a proper SSL
# shutdown
@@ -589,7 +668,7 @@
# unwrap seems flaky on Win with the back-ported ssl mod,
so catch any exception and log it
#
_, e, _ = sys.exc_info()
- logging.warning(e)
+ log.warning(e)
elif hasattr(socket, "SHUT_RDWR"):
try:
self.socket.shutdown(socket.SHUT_RDWR)
@@ -597,7 +676,7 @@
_, e, _ = sys.exc_info()
# ignore when socket already closed
if get_errno(e) != errno.ENOTCONN:
- logging.warning("unable to issue SHUT_RDWR on socket
because of error '%s'", e)
+ log.warning("unable to issue SHUT_RDWR on socket
because of error '%s'", e)
#
# split this into a separate check, because sometimes the socket is
nulled between shutdown and this call
@@ -607,7 +686,7 @@
self.socket.close()
except socket.error:
_, e, _ = sys.exc_info()
- logging.warning("unable to close socket because of error
'%s'", e)
+ log.warning("unable to close socket because of error '%s'", e)
self.current_host_and_port = None
self.socket = None
if not self.notified_on_disconnect:
@@ -623,7 +702,7 @@
self.socket.sendall(encoded_frame)
except Exception:
_, e, _ = sys.exc_info()
- logging.error("error sending frame", exc_info=True)
+ log.error("error sending frame", exc_info=True)
raise e
else:
raise exception.NotConnectedException()
@@ -637,7 +716,7 @@
except socket.error:
_, e, _ = sys.exc_info()
if get_errno(e) in (errno.EAGAIN, errno.EINTR):
- logging.debug("socket read interrupted, restarting")
+ log.debug("socket read interrupted, restarting")
raise exception.InterruptedException()
if self.is_connected():
raise
@@ -646,21 +725,31 @@
"""
Close the socket and clear the current host and port details.
"""
+ if not self.blocking:
+ self.shutdown_queue()
try:
self.socket.close()
except:
pass # ignore errors when attempting to close socket
self.socket = None
+ def shutdown_queue(self):
+ self.blocking = True
+ self.sel.unregister(self.socket)
+ self.sel.unregister(self.send_queue)
+ self.sel = None
+ self.send_queue.shutdown()
+ self.send_queue = None
+
def __enable_keepalive(self):
def try_setsockopt(sock, name, fam, opt, val=None):
if val is None:
return True # no value to set always works
try:
sock.setsockopt(fam, opt, val)
- logging.debug("keepalive: set %r option to %r on socket",
name, val)
+ log.debug("keepalive: set %r option to %r on socket", name,
val)
except:
- logging.error("keepalive: unable to set %r option to %r on
socket", name, val)
+ log.error("keepalive: unable to set %r option to %r on
socket", name, val)
return False
return True
@@ -677,26 +766,26 @@
ka_sig = ka[0]
ka_args = ka[1:]
except Exception:
- logging.error("keepalive: bad specification %r", ka)
+ log.error("keepalive: bad specification %r", ka)
return
if ka_sig == "auto":
if LINUX_KEEPALIVE_AVAIL:
ka_sig = "linux"
ka_args = None
- logging.debug("keepalive: autodetected linux-style support")
+ log.debug("keepalive: autodetected linux-style support")
elif MAC_KEEPALIVE_AVAIL:
ka_sig = "mac"
ka_args = None
- logging.debug("keepalive: autodetected mac-style support")
+ log.debug("keepalive: autodetected mac-style support")
else:
- logging.error("keepalive: unable to detect any implementation,
DISABLED!")
+ log.error("keepalive: unable to detect any implementation,
DISABLED!")
return
if ka_sig == "linux":
- logging.debug("keepalive: activating linux-style support")
+ log.debug("keepalive: activating linux-style support")
if ka_args is None:
- logging.debug("keepalive: using system defaults")
+ log.debug("keepalive: using system defaults")
ka_args = (None, None, None)
ka_idle, ka_intvl, ka_cnt = ka_args
if try_setsockopt(self.socket, "enable", SOL_SOCKET, SO_KEEPALIVE,
1):
@@ -704,15 +793,15 @@
try_setsockopt(self.socket, "interval", SOL_TCP,
TCP_KEEPINTVL, ka_intvl)
try_setsockopt(self.socket, "count", SOL_TCP, TCP_KEEPCNT,
ka_cnt)
elif ka_sig == "mac":
- logging.debug("keepalive: activating mac-style support")
+ log.debug("keepalive: activating mac-style support")
if ka_args is None:
- logging.debug("keepalive: using system defaults")
+ log.debug("keepalive: using system defaults")
ka_args = (3,)
ka_intvl = ka_args
if try_setsockopt(self.socket, "enable", SOL_SOCKET, SO_KEEPALIVE,
1):
try_setsockopt(self.socket, socket.IPPROTO_TCP, 0x10, ka_intvl)
else:
- logging.error("keepalive: implementation %r not recognized or not
supported", ka_sig)
+ log.error("keepalive: implementation %r not recognized or not
supported", ka_sig)
def attempt_connection(self):
"""
@@ -722,12 +811,12 @@
sleep_exp = 1
connect_count = 0
- logging.debug("attempt reconnection (%s, %s, %s)", self.running,
self.socket, connect_count)
+ log.debug("attempt reconnection (%s, %s, %s)", self.running,
self.socket, connect_count)
while self.running and self.socket is None and (connect_count <
self.__reconnect_attempts_max or
self.__reconnect_attempts_max == -1):
for host_and_port in self.__host_and_ports:
try:
- logging.debug("attempting connection to host %s, port %s",
host_and_port[0], host_and_port[1])
+ log.debug("attempting connection to host %s, port %s",
host_and_port[0], host_and_port[1])
if self.__bind_host_port:
self.socket = socket.create_connection(host_and_port,
self.__timeout, self.__bind_host_port)
else:
@@ -737,11 +826,14 @@
if need_ssl: # wrap socket
ssl_params = self.get_ssl(host_and_port)
- tls_context = ssl.SSLContext(DEFAULT_SSL_VERSION)
+ tls_context = ssl.SSLContext(ssl_params["ssl_version"])
if ssl_params["ca_certs"]:
- cert_validation = ssl.CERT_REQUIRED
tls_context.load_verify_locations(ssl_params["ca_certs"])
else:
+ tls_context.load_default_certs()
+ if ssl_params["verify"]:
+ cert_validation = ssl.CERT_REQUIRED
+ else:
cert_validation = ssl.CERT_NONE
if tls_context:
# Wrap the socket for TLS
@@ -755,11 +847,11 @@
if cert_validation is None or cert_validation ==
ssl.CERT_NONE:
tls_context.check_hostname = False
tls_context.verify_mode = cert_validation
- logging.debug("wrapping SSL socket")
+ log.debug("wrapping SSL socket")
self.socket = tls_context.wrap_socket(self.socket,
server_hostname=host_and_port[0])
else:
# Old-style wrap_socket where we don't have a
modern SSLContext (so no SNI)
- logging.debug("wrapping SSL socket (old style)")
+ log.debug("wrapping SSL socket (old style)")
self.socket = ssl.wrap_socket(
self.socket,
keyfile=ssl_params["key_file"],
@@ -767,11 +859,15 @@
cert_reqs=cert_validation,
ca_certs=ssl_params["ca_certs"],
ssl_version=ssl_params["ssl_version"])
+ # Handshake is done. Switch to non-blocking mode.
+ self.send_queue = PollableQueue()
+ self.sel = selectors.DefaultSelector()
+ self.sel.register(self.send_queue,
selectors.EVENT_READ)
+ self.sel.register(self.socket, selectors.EVENT_READ)
+ self.blocking = False
self.socket.settimeout(self.__timeout)
-
- if self.blocking is not None:
- self.socket.setblocking(self.blocking)
+ self.socket.setblocking(self.blocking)
#
# Validate server cert
@@ -783,19 +879,19 @@
raise SSLError("Server certificate validation
failed: %s", errmsg)
self.current_host_and_port = host_and_port
- logging.info("established connection to host %s, port %s",
host_and_port[0], host_and_port[1])
+ log.info("established connection to host %s, port %s",
host_and_port[0], host_and_port[1])
break
except FileNotFoundError as err:
- logging.error("Could not find file %s", err.filename)
+ log.error("Could not find file %s", err.filename)
self.socket = None
break
- except (OSError, AssertionError):
+ except (OSError, AssertionError) as err:
self.socket = None
connect_count += 1
- logging.warning("could not connect to host %s, port %s",
host_and_port[0], host_and_port[1],
- exc_info=logging.verbose)
+ log.warning("could not connect to host %s, port %s: %s",
host_and_port[0], host_and_port[1],
+ err, exc_info=log.verbose)
if self.socket is None:
sleep_duration = (min(self.__reconnect_sleep_max,
@@ -803,7 +899,7 @@
* math.pow(1.0 +
self.__reconnect_sleep_increase, sleep_exp)))
* (1.0 + random.random() *
self.__reconnect_sleep_jitter))
sleep_end = monotonic() + sleep_duration
- logging.debug("sleeping for %.1f seconds before attempting
reconnect", sleep_duration)
+ log.debug("sleeping for %.1f seconds before attempting
reconnect", sleep_duration)
while self.running and monotonic() < sleep_end:
time.sleep(0.2)
@@ -820,7 +916,8 @@
ca_certs=None,
cert_validator=None,
ssl_version=DEFAULT_SSL_VERSION,
- password=None):
+ password=None,
+ verify=True):
"""
Sets up SSL configuration for the given hosts. This ensures socket is
wrapped in a SSL connection, raising an
exception if the SSL module can't be found.
@@ -848,6 +945,7 @@
cert_file=cert_file,
ca_certs=ca_certs,
cert_validator=cert_validator,
+ verify=verify,
ssl_version=ssl_version,
password=password)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/stomp_py-8.2.0/stomp/utils.py
new/stomp_py-9.0.0/stomp/utils.py
--- old/stomp_py-8.2.0/stomp/utils.py 2022-11-13 17:44:23.511556000 +0100
+++ new/stomp_py-9.0.0/stomp/utils.py 1970-01-01 01:00:00.000000000 +0100
@@ -4,6 +4,7 @@
import copy
import os
import re
+import queue
import socket
import threading
import uuid
@@ -354,3 +355,41 @@
HEARTBEAT_FRAME = Frame("heartbeat")
+
+
+# A selectors-friendly queue based on the post
+#
https://stackoverflow.com/questions/17495877/python-how-to-wait-on-both-queue-and-a-socket-on-same-time
+# but appears to originate from "Python Cookbook: Recipes for Mastering Python
3"
+# by David Beazley and Brian K. Jones.
+class PollableQueue(queue.Queue):
+
+ def __init__(self):
+ super().__init__()
+ # Create a pair of connected sockets
+ if os.name == 'posix':
+ self._putsocket, self._getsocket = socket.socketpair()
+ else:
+ # Compatibility on non-POSIX systems
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.bind(('127.0.0.1', 0))
+ server.listen(1)
+ self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._putsocket.connect(server.getsockname())
+ self._getsocket, _ = server.accept()
+ server.close()
+
+ def fileno(self):
+ return self._getsocket.fileno()
+
+ def put(self, item):
+ super().put(item)
+ # Might block if the consumer is slow
+ self._putsocket.send(b'x')
+
+ def get(self):
+ self._getsocket.recv(1)
+ return super().get()
+
+ def shutdown(self):
+ self._putsocket.close()
+ self._getsocket.close()