Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-streamz for openSUSE:Factory checked in at 2026-04-28 11:56:10 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-streamz (Old) and /work/SRC/openSUSE:Factory/.python-streamz.new.11940 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-streamz" Tue Apr 28 11:56:10 2026 rev:14 rq:1349439 version:0.6.6 Changes: -------- --- /work/SRC/openSUSE:Factory/python-streamz/python-streamz.changes 2025-01-22 16:39:32.789915584 +0100 +++ /work/SRC/openSUSE:Factory/.python-streamz.new.11940/python-streamz.changes 2026-04-28 12:00:03.238693519 +0200 @@ -1,0 +2,16 @@ +Sun Apr 26 20:05:05 UTC 2026 - Dirk Müller <[email protected]> + +- update to 0.6.6: + * Use importlib.metadata instead of pkg_resources + * Support for Python 3.10+ + * handle username and pw for mqtt source + * remove six dependency + * hybridize usage of importlib.metadata for Python3.10+ + * CI: Add option to invoke workflow manually +- Remove obsolete patches: + * streamz-pr455-ci-fixes.patch (integrated upstream) + * python-streamz-no-six.patch (integrated upstream) +- Refresh streamz-opensuse-python-exec.patch +- Update BuildRequires to require python base >= 3.10 + +------------------------------------------------------------------- Old: ---- python-streamz-no-six.patch streamz-0.6.4.tar.gz streamz-pr455-ci-fixes.patch New: ---- streamz-0.6.6.tar.gz ----------(Old B)---------- Old: * streamz-pr455-ci-fixes.patch (integrated upstream) * python-streamz-no-six.patch (integrated upstream) - Refresh streamz-opensuse-python-exec.patch Old:- Remove obsolete patches: * streamz-pr455-ci-fixes.patch (integrated upstream) * python-streamz-no-six.patch (integrated upstream) ----------(Old E)---------- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-streamz.spec ++++++ --- /var/tmp/diff_new_pack.9T0ulg/_old 2026-04-28 12:00:04.398742248 +0200 +++ /var/tmp/diff_new_pack.9T0ulg/_new 2026-04-28 12:00:04.402742416 +0200 @@ -1,7 +1,7 @@ # # spec file for package python-streamz # -# Copyright (c) 2025 SUSE LLC +# Copyright (c) 2026 SUSE LLC and contributors # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -17,26 +17,20 @@ Name: python-streamz -Version: 0.6.4 +Version: 0.6.6 Release: 0 Summary: Tool to build continuous data pipelines License: BSD-3-Clause URL: https://github.com/python-streamz/streamz/ Source: https://files.pythonhosted.org/packages/source/s/streamz/streamz-%{version}.tar.gz -# PATCH-FIX-UPSTREAM streamz-pr455-ci-fixes.patch gh#python-streamz/streamz#455 -Patch0: streamz-pr455-ci-fixes.patch # PATCH-FIX-OPENSUSE streamz-opensuse-python-exec.patch -- call tests with correct flavor -Patch1: streamz-opensuse-python-exec.patch -# PATCH-FIX-UPSTREAM python-streamz-no-six.patch gh#python-streamz/streamz/commit/33f49417b415deb7ea3c495a404b78c9d3743c03 -Patch2: python-streamz-no-six.patch -BuildRequires: %{python_module base >= 3.8} +Patch0: streamz-opensuse-python-exec.patch +BuildRequires: %{python_module base >= 3.10} BuildRequires: %{python_module pip} BuildRequires: %{python_module setuptools} BuildRequires: %{python_module wheel} BuildRequires: fdupes BuildRequires: python-rpm-macros -# Setuptools is a runtime requirement because of pkg_resources usage -Requires: python-setuptools Requires: python-toolz Requires: python-tornado Requires: python-zict ++++++ streamz-0.6.4.tar.gz -> streamz-0.6.6.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/PKG-INFO new/streamz-0.6.6/PKG-INFO --- old/streamz-0.6.4/PKG-INFO 2022-07-27 20:07:33.221615000 +0200 +++ new/streamz-0.6.6/PKG-INFO 2026-04-07 16:12:02.136303400 +0200 @@ -1,15 +1,27 @@ -Metadata-Version: 2.1 +Metadata-Version: 2.4 Name: streamz -Version: 0.6.4 +Version: 0.6.6 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin Maintainer-email: [email protected] License: BSD Keywords: streams -Platform: UNKNOWN -Requires-Python: >=3.7 +Requires-Python: >=3.10 License-File: LICENSE.txt +Requires-Dist: tornado +Requires-Dist: toolz +Requires-Dist: zict +Dynamic: description +Dynamic: home-page +Dynamic: keywords +Dynamic: license +Dynamic: license-file +Dynamic: maintainer +Dynamic: maintainer-email +Dynamic: requires-dist +Dynamic: requires-python +Dynamic: summary Streamz ======= @@ -36,5 +48,3 @@ :target: https://pypi.python.org/pypi/streamz/ .. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py - - diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/docs/source/api.rst new/streamz-0.6.6/docs/source/api.rst --- old/streamz-0.6.4/docs/source/api.rst 2021-11-01 21:26:17.000000000 +0100 +++ new/streamz-0.6.6/docs/source/api.rst 2026-04-07 16:10:13.000000000 +0200 @@ -22,6 +22,7 @@ filter flatten map + map_async partition rate_limit scatter diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/docs/source/async.rst new/streamz-0.6.6/docs/source/async.rst --- old/streamz-0.6.4/docs/source/async.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.6/docs/source/async.rst 2026-04-07 16:10:13.000000000 +0200 @@ -72,8 +72,8 @@ .. code-block:: python + import asyncio from streamz import Stream - from tornado.ioloop import IOLoop async def f(): source = Stream(asynchronous=True) # tell the stream we're working asynchronously @@ -82,7 +82,28 @@ for x in range(10): await source.emit(x) - IOLoop().run_sync(f) + asyncio.run(f()) + +When working asynchronously, we can also map asynchronous functions. + +.. code-block:: python + + async def increment_async(x): + """ A "long-running" increment function + + Simulates a function that does real asyncio work. + """ + await asyncio.sleep(0.1) + return x + 1 + + async def f_inc(): + source = Stream(asynchronous=True) # tell the stream we're working asynchronously + source.map_async(increment_async).rate_limit(0.500).sink(write) + + for x in range(10): + await source.emit(x) + + asyncio.run(f_inc()) Event Loop on a Separate Thread diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/docs/source/conf.py new/streamz-0.6.6/docs/source/conf.py --- old/streamz-0.6.4/docs/source/conf.py 2021-10-04 17:35:30.000000000 +0200 +++ new/streamz-0.6.6/docs/source/conf.py 2026-04-07 16:10:13.000000000 +0200 @@ -163,6 +163,3 @@ author, 'Streamz', 'Support for pipelines managing continuous streams of data.', 'Miscellaneous'), ] - - - diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/docs/source/index.rst new/streamz-0.6.6/docs/source/index.rst --- old/streamz-0.6.4/docs/source/index.rst 2020-12-01 17:08:16.000000000 +0100 +++ new/streamz-0.6.6/docs/source/index.rst 2025-12-18 17:57:14.000000000 +0100 @@ -123,3 +123,4 @@ async.rst plotting.rst plugins.rst + use-cases.rst diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/docs/source/use-cases.rst new/streamz-0.6.6/docs/source/use-cases.rst --- old/streamz-0.6.4/docs/source/use-cases.rst 1970-01-01 01:00:00.000000000 +0100 +++ new/streamz-0.6.6/docs/source/use-cases.rst 2025-12-18 17:57:14.000000000 +0100 @@ -0,0 +1,28 @@ +Use cases +========= + +ETL applications +---------------- + + In LorryStream_, we use Streamz at the core for relaying data from streaming + sources into CrateDB_, because we have been looking for something smaller + and more concise than Beam, Flink, or Spark. Streamz gives us the freedom + to use advanced stream processing and flow control primitives within Python + applications to feed databases, without the need to spin up compute clusters, + or deal with Java class paths. + +In this spirit, LorryStream is effectively just a humble CLI interface for +Streamz. + +Telemetry readings +------------------ + + At my company, we use it at two different points in our data processing to + create a disjoint subcover of a noisy stream of telemetry readings and then + blend in metadata from users and external authoritative resources to give + users an understanding of what (and what kinds of) events are happening in + their space. + + +.. _CrateDB: https://github.com/crate/crate +.. _LorryStream: https://lorrystream.readthedocs.io/ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/requirements.txt new/streamz-0.6.6/requirements.txt --- old/streamz-0.6.4/requirements.txt 2020-12-01 17:08:16.000000000 +0100 +++ new/streamz-0.6.6/requirements.txt 2026-04-07 16:10:13.000000000 +0200 @@ -1,5 +1,3 @@ tornado toolz zict -six -setuptools \ No newline at end of file diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/setup.py new/streamz-0.6.6/setup.py --- old/streamz-0.6.4/setup.py 2022-07-27 20:06:45.000000000 +0200 +++ new/streamz-0.6.6/setup.py 2026-04-07 16:11:33.000000000 +0200 @@ -9,7 +9,7 @@ setup(name='streamz', - version='0.6.4', + version='0.6.6', description='Streams', url='http://github.com/python-streamz/streamz/', maintainer='Matthew Rocklin', @@ -17,7 +17,7 @@ license='BSD', keywords='streams', packages=packages + tests, - python_requires='>=3.7', + python_requires='>=3.10', long_description=(open('README.rst').read() if exists('README.rst') else ''), install_requires=list(open('requirements.txt').read().strip().split('\n')), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/__init__.py new/streamz-0.6.6/streamz/__init__.py --- old/streamz-0.6.4/streamz/__init__.py 2022-07-27 20:06:14.000000000 +0200 +++ new/streamz-0.6.6/streamz/__init__.py 2026-04-07 16:11:33.000000000 +0200 @@ -13,4 +13,4 @@ except ImportError: pass -__version__ = '0.6.4' +__version__ = '0.6.6' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/core.py new/streamz-0.6.6/streamz/core.py --- old/streamz-0.6.4/streamz/core.py 2021-12-30 22:18:19.000000000 +0100 +++ new/streamz-0.6.6/streamz/core.py 2026-04-07 16:10:13.000000000 +0200 @@ -1,13 +1,13 @@ import asyncio +import concurrent.futures from collections import deque, defaultdict from datetime import timedelta +from itertools import chain import functools import logging -import six -import sys import threading from time import time -from typing import Any, Callable, Hashable, Union +from typing import Any, Callable, Coroutine, Hashable, Tuple, Union, overload import weakref import toolz @@ -52,7 +52,7 @@ return client.loop if not _io_loops: - loop = IOLoop() + loop = IOLoop(make_current=False) thread = threading.Thread(target=loop.start) thread.daemon = True thread.start() @@ -168,7 +168,7 @@ def register_plugin_entry_point(cls, entry_point, modifier=identity): if hasattr(cls, entry_point.name): raise ValueError( - f"Can't add {entry_point.name} from {entry_point.module_name} " + f"Can't add {entry_point.name} " f"to {cls.__name__}: duplicate method name." ) @@ -178,7 +178,6 @@ if not issubclass(node, Stream): raise TypeError( f"Error loading {entry_point.name} " - f"from module {entry_point.module_name}: " f"{node.__class__.__name__} must be a subclass of Stream" ) if getattr(cls, entry_point.name).__name__ == "stub": @@ -379,13 +378,14 @@ __repr__ = __str__ def _ipython_display_(self, **kwargs): # pragma: no cover + # Since this function is only called by jupyter, this import must succeed + from IPython.display import HTML, display + try: import ipywidgets from IPython.core.interactiveshell import InteractiveShell output = ipywidgets.Output(_view_count=0) except ImportError: - # since this function is only called by jupyter, this import must succeed - from IPython.display import display, HTML if hasattr(self, '_repr_html_'): return display(HTML(self._repr_html_())) else: @@ -420,7 +420,11 @@ output.observe(remove_stream, '_view_count') - return output._ipython_display_(**kwargs) + if hasattr(output, "_repr_mimebundle_"): + data = output._repr_mimebundle_(**kwargs) + return display(data, raw=True) + else: + return output._ipython_display_(**kwargs) def _emit(self, x, metadata=None): """ @@ -716,6 +720,122 @@ @Stream.register_api() +class map_async(Stream): + """ Apply an async function to every element in the stream, preserving order + even when evaluating multiple inputs in parallel. + + Parameters + ---------- + func: async callable + *args : + The arguments to pass to the function. + parallelism: + The maximum number of parallel Tasks for evaluating func, default value is 1 + stop_on_exception: + If the mapped func raises an exception, should the stream stop or not. Default value is False. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> async def mult(x, factor=1): + ... return factor*x + >>> async def run(): + ... source = Stream(asynchronous=True) + ... source.map_async(mult, factor=2).sink(print) + ... for i in range(5): + ... await source.emit(i) + >>> asyncio.run(run()) + 0 + 2 + 4 + 6 + 8 + """ + def __init__(self, upstream, func, *args, parallelism=1, stop_on_exception=False, **kwargs): + self.func = func + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + self.stop_on_exception = stop_on_exception + self.work_queue = asyncio.Queue(maxsize=parallelism) + + Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) + self.work_task = None + + def _create_work_task(self) -> Tuple[asyncio.Event, asyncio.Task[None]]: + stop_work = asyncio.Event() + work_task = self._create_task(self.work_callback(stop_work)) + return stop_work, work_task + + def start(self): + if self.work_task: + stop_work, _ = self.work_task + stop_work.set() + self.work_task = self._create_work_task() + super().start() + + def stop(self): + stop_work, _ = self.work_task + stop_work.set() + self.work_task = None + super().stop() + + def update(self, x, who=None, metadata=None): + if not self.work_task: + self.work_task = self._create_work_task() + return self._create_task(self._insert_job(x, metadata)) + + @overload + def _create_task(self, coro: asyncio.Future) -> asyncio.Future: + ... + + @overload + def _create_task(self, coro: concurrent.futures.Future) -> concurrent.futures.Future: + ... + + @overload + def _create_task(self, coro: Coroutine) -> asyncio.Task: + ... + + def _create_task(self, coro): + if gen.is_future(coro): + return coro + return self.loop.asyncio_loop.create_task(coro) + + async def work_callback(self, stop_work: asyncio.Event): + while not stop_work.is_set(): + task, metadata = await self.work_queue.get() + self.work_queue.task_done() + try: + result = await task + except Exception as e: + logger.exception(e) + if self.stop_on_exception: + self.stop() + else: + results = self._emit(result, metadata=metadata) + if results: + await asyncio.gather(*results) + self._release_refs(metadata) + + async def _wait_for_work_slot(self): + while self.work_queue.full(): + await asyncio.sleep(0) + + async def _insert_job(self, x, metadata): + try: + await self._wait_for_work_slot() + coro = self.func(x, *self.args, **self.kwargs) + task = self._create_task(coro) + await self.work_queue.put((task, metadata)) + self._retain_refs(metadata) + except Exception as e: + logger.exception(e) + raise + + [email protected]_api() class starmap(Stream): """ Apply a function to every element in the stream, splayed out @@ -1468,18 +1588,23 @@ def __init__(self, *upstreams, **kwargs): self.maxsize = kwargs.pop('maxsize', 10) - self.condition = Condition() + self._condition = None self.literals = [(i, val) for i, val in enumerate(upstreams) if not isinstance(val, Stream)] self.buffers = {upstream: deque() for upstream in upstreams if isinstance(upstream, Stream)} - upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)] Stream.__init__(self, upstreams=upstreams2, **kwargs) + @property + def condition(self): + if self._condition is None: + self._condition = Condition() + return self._condition + def _add_upstream(self, upstream): # Override method to handle setup of buffer for new stream self.buffers[upstream] = deque() @@ -1628,15 +1753,23 @@ """ def update(self, x, who=None, metadata=None): L = [] - for i, item in enumerate(x): - if i == len(x) - 1: - y = self._emit(item, metadata=metadata) - else: - y = self._emit(item) + items = chain(x) + try: + item = next(items) + except StopIteration: + return L + for item_next in items: + y = self._emit(item) + item = item_next if type(y) is list: L.extend(y) else: L.append(y) + y = self._emit(item, metadata=metadata) + if type(y) is list: + L.extend(y) + else: + L.append(y) return L @@ -1876,7 +2009,7 @@ _graphviz_shape = 'octagon' def __init__(self, upstream, **kwargs): - self.condition = Condition() + self._condition = None self.next = [] self.next_metadata = None @@ -1885,6 +2018,12 @@ self.loop.add_callback(self.cb) + @property + def condition(self): + if self._condition is None: + self._condition = Condition() + return self._condition + def update(self, x, who=None, metadata=None): if self.next_metadata: self._release_refs(self.next_metadata) @@ -1926,8 +2065,8 @@ if timeout is not None: future = gen.with_timeout(timedelta(seconds=timeout), future) result[0] = yield future - except Exception: - error[0] = sys.exc_info() + except Exception as exc: + error[0] = exc finally: thread_state.asynchronous = False e.set() @@ -1939,7 +2078,8 @@ else: while not e.is_set(): e.wait(10) + if error[0]: - six.reraise(*error[0]) + raise error[0] else: return result[0] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py new/streamz-0.6.6/streamz/dataframe/tests/test_dataframes.py --- old/streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py 2021-04-05 18:03:49.000000000 +0200 +++ new/streamz-0.6.6/streamz/dataframe/tests/test_dataframes.py 2026-04-07 16:10:13.000000000 +0200 @@ -8,6 +8,7 @@ from dask.dataframe.utils import assert_eq import numpy as np import pandas as pd +from flaky import flaky from tornado import gen from streamz import Stream @@ -219,7 +220,7 @@ a.emit(df) - assert_eq(b[0], expected) + wait_for(lambda: b and b[0].equals(expected), 1) def test_index(stream): @@ -246,7 +247,7 @@ a.emit(df.iloc[:5]) a.emit(df.iloc[5:]) - assert len(L) == 2 + wait_for(lambda: len(L) == 2, 1) assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2) @@ -259,7 +260,7 @@ a.emit(df.iloc[:5]) a.emit(df.iloc[5:]) - assert len(L) == 2 + wait_for(lambda: len(L) == 2, 1) assert_eq(pd.concat(L, axis=0), df[df.x > 4]) @@ -298,6 +299,7 @@ a.emit(df.iloc[7:]) first = df.iloc[:3] + wait_for(lambda: len(L) > 2, 1) assert assert_eq(L[0], f(first)) assert assert_eq(L[-1], f(df)) @@ -382,7 +384,7 @@ df['a'] = 10 df[['c', 'd']] = df[['x', 'y']] - assert_eq(L[-1], df.mean()) + wait_for(lambda: L and L[-1].equals(df.mean()), 1) def test_setitem_overwrites(stream): @@ -569,6 +571,7 @@ assert_eq(pd.concat(L), expected) +@flaky(max_runs=3, min_passes=1) @gen_test() def test_gc(): sdf = sd.Random(freq='5ms', interval='100ms') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/orderedweakset.py new/streamz-0.6.6/streamz/orderedweakset.py --- old/streamz-0.6.4/streamz/orderedweakset.py 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.6/streamz/orderedweakset.py 2025-12-17 22:16:22.000000000 +0100 @@ -26,6 +26,9 @@ def discard(self, value): self._od.pop(value, None) + def copy(self): + return OrderedSet(self._od.copy()) + class OrderedWeakrefSet(weakref.WeakSet): def __init__(self, values=()): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/plugins.py new/streamz-0.6.6/streamz/plugins.py --- old/streamz-0.6.4/streamz/plugins.py 2020-12-01 17:08:16.000000000 +0100 +++ new/streamz-0.6.6/streamz/plugins.py 2025-12-17 20:33:17.000000000 +0100 @@ -1,6 +1,6 @@ import warnings -import pkg_resources +import importlib.metadata def try_register(cls, entry_point, *modifier): @@ -13,10 +13,19 @@ ) +def get_entry_point(eps, group): + if hasattr(eps, "select"): # Python 3.10+ / importlib_metadata >= 3.9.0 + return eps.select(group=group) + else: + return eps.get(group, []) + + def load_plugins(cls): - for entry_point in pkg_resources.iter_entry_points("streamz.sources"): + eps = importlib.metadata.entry_points() + + for entry_point in get_entry_point(eps, "streamz.sources"): try_register(cls, entry_point, staticmethod) - for entry_point in pkg_resources.iter_entry_points("streamz.nodes"): + for entry_point in get_entry_point(eps, "streamz.nodes"): try_register(cls, entry_point) - for entry_point in pkg_resources.iter_entry_points("streamz.sinks"): + for entry_point in get_entry_point(eps, "streamz.sinks"): try_register(cls, entry_point) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/sources.py new/streamz-0.6.6/streamz/sources.py --- old/streamz-0.6.4/streamz/sources.py 2021-11-02 14:02:22.000000000 +0100 +++ new/streamz-0.6.6/streamz/sources.py 2026-04-07 16:10:13.000000000 +0200 @@ -3,6 +3,8 @@ import queue import os import time +from inspect import isawaitable + from tornado import gen import weakref @@ -252,7 +254,9 @@ while not self.source.stopped: try: data = await stream.read_until(self.source.delimiter) - await self.source._emit(data) + result = self.source._emit(data) + if isawaitable(result): + await result except StreamClosedError: break @@ -786,6 +790,8 @@ if self.stopped: break await asyncio.gather(*self._emit(x)) + if self.stopped: + break self.stopped = True @@ -896,12 +902,15 @@ :param client_kwargs: Passed to the client's ``connect()`` method """ - def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs): + def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, + user=None, pw=None, **kwargs): self.host = host self.port = port self.keepalive = keepalive self.topic = topic self.client_kwargs = client_kwargs + self.user = user + self.pw = pw super().__init__(q=queue.Queue(), **kwargs) def _on_connect(self, client, userdata, flags, rc): @@ -913,6 +922,8 @@ async def run(self): import paho.mqtt.client as mqtt client = mqtt.Client() + if self.user: + client.username_pw_set(self.user, self.pw) client.on_connect = self._on_connect client.on_message = self._on_message client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {})) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/tests/py3_test_core.py new/streamz-0.6.6/streamz/tests/py3_test_core.py --- old/streamz-0.6.4/streamz/tests/py3_test_core.py 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.6/streamz/tests/py3_test_core.py 2025-12-15 21:33:18.000000000 +0100 @@ -1,16 +1,16 @@ # flake8: noqa +import asyncio from time import time -from distributed.utils_test import loop, inc # noqa -from tornado import gen +from distributed.utils_test import inc # noqa from streamz import Stream -def test_await_syntax(loop): # noqa +def test_await_syntax(): # noqa L = [] async def write(x): - await gen.sleep(0.1) + await asyncio.sleep(0.1) L.append(x) async def f(): @@ -25,4 +25,4 @@ assert 0.2 < stop - start < 0.4 assert 2 <= len(L) <= 4 - loop.run_sync(f) + asyncio.run(f()) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/tests/test_core.py new/streamz-0.6.6/streamz/tests/test_core.py --- old/streamz-0.6.4/streamz/tests/test_core.py 2022-07-27 20:06:05.000000000 +0200 +++ new/streamz-0.6.6/streamz/tests/test_core.py 2026-04-07 16:10:13.000000000 +0200 @@ -1,3 +1,4 @@ +import asyncio from datetime import timedelta from functools import partial import itertools @@ -12,6 +13,7 @@ from tornado.queues import Queue from tornado.ioloop import IOLoop +from tornado import gen import streamz as sz @@ -19,7 +21,7 @@ from streamz.sources import sink_to_file from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401 clean, await_for, metadata, wait_for) # noqa: F401 -from distributed.utils_test import loop # noqa: F401 +from distributed.utils_test import loop, loop_in_thread, cleanup # noqa: F401 def test_basic(): @@ -124,6 +126,78 @@ assert L[0] == 11 +@gen_test() +def test_map_async_tornado(): + @gen.coroutine + def add_tor(x=0, y=0): + return x + y + + async def add_native(x=0, y=0): + await asyncio.sleep(0.1) + return x + y + + source = Stream(asynchronous=True) + L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).buffer(1).sink_to_list() + + start = time() + yield source.emit(0) + yield source.emit(1) + yield source.emit(2) + + def fail_func(): + assert L == [3, 4, 5] + + yield await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) + assert (time() - start) == pytest.approx(0.1, abs=4e-3) + + [email protected] +async def test_map_async_restart(): + async def flake_out(x): + if x == 2: + raise RuntimeError("I fail on 2.") + if x > 4: + raise RuntimeError("I fail on > 4.") + return x + + source = Stream.from_iterable(itertools.count()) + mapped = source.map_async(flake_out, stop_on_exception=True) + results = mapped.sink_to_list() + source.start() + + await await_for(lambda: results == [0, 1], 1) + await await_for(lambda: not mapped.work_task, 1) + + source.start() + + await await_for(lambda: results == [0, 1, 3, 4], 1) + + [email protected] +async def test_map_async(): + @gen.coroutine + def add_tor(x=0, y=0): + return x + y + + async def add_native(x=0, y=0): + await asyncio.sleep(0.1) + return x + y + + source = Stream(asynchronous=True) + L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).sink_to_list() + + start = time() + await source.emit(0) + await source.emit(1) + await source.emit(2) + + def fail_func(): + assert L == [3, 4, 5] + + await await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) + assert (time() - start) == pytest.approx(0.1, abs=4e-3) + + def test_map_args(): source = Stream() L = source.map(operator.add, 10).sink_to_list() @@ -442,7 +516,8 @@ ] -def test_timed_window_timedelta(clean): # noqa: F811 [email protected] +async def test_timed_window_timedelta(clean): # noqa: F811 pytest.importorskip('pandas') source = Stream(asynchronous=True) a = source.timed_window('10ms') @@ -798,17 +873,30 @@ assert L[-1] == {'a': 2, 'b': 1} -def test_flatten(): [email protected]("iterators", + [[[1, 2, 3], [4, 5], [6, 7, 8]], + [(i for i in range(1, 7)), (i for i in range(7, 9))]]) +def test_flatten(iterators): source = Stream() L = source.flatten().sink_to_list() - source.emit([1, 2, 3]) - source.emit([4, 5]) - source.emit([6, 7, 8]) + for iterator in iterators: + source.emit(iterator) assert L == [1, 2, 3, 4, 5, 6, 7, 8] +def test_flatten_empty(): + source = Stream() + L = source.flatten().sink_to_list() + + source.emit([1, 2]) + source.emit([]) + source.emit([3, 4]) + + assert L == [1, 2, 3, 4] + + def test_unique(): source = Stream() L = source.unique().sink_to_list() @@ -1485,20 +1573,6 @@ sin.emit(1) [email protected] -def thread(loop): # noqa: F811 - from threading import Thread, Event - thread = Thread(target=loop.start) - thread.daemon = True - thread.start() - - event = Event() - loop.add_callback(event.set) - event.wait() - - return thread - - def test_percolate_loop_information(clean): # noqa: F811 source = Stream() assert not source.loop @@ -1506,16 +1580,6 @@ assert source.loop is s.loop -def test_separate_thread_without_time(loop, thread): # noqa: F811 - assert thread.is_alive() - source = Stream(loop=loop) - L = source.map(inc).sink_to_list() - - for i in range(10): - source.emit(i) - assert L[-1] == i + 1 - - def test_separate_thread_with_time(clean): # noqa: F811 L = [] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/tests/test_dask.py new/streamz-0.6.6/streamz/tests/test_dask.py --- old/streamz-0.6.4/streamz/tests/test_dask.py 2021-12-30 22:18:19.000000000 +0100 +++ new/streamz-0.6.6/streamz/tests/test_dask.py 2025-12-15 21:33:18.000000000 +0100 @@ -72,10 +72,10 @@ assert L == [1, 2, 3] -def test_partition_then_scatter_sync(loop): +def test_partition_then_scatter_sync(): # Ensure partition w/ timeout before scatter works correctly for synchronous with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 start = time.monotonic() source = Stream() L = source.partition(2, timeout=.1).scatter().map( @@ -164,9 +164,9 @@ assert L[-1][1] == 3 -def test_sync(loop): # noqa: F811 +def test_sync(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 source = Stream() L = source.scatter().map(inc).gather().sink_to_list() @@ -174,14 +174,14 @@ for i in range(10): await source.emit(i, asynchronous=True) - sync(loop, f) + sync(client.loop, f) assert L == list(map(inc, range(10))) -def test_sync_2(loop): # noqa: F811 +def test_sync_2(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop): # noqa: F841 + with Client(s['address']): # noqa: F841 source = Stream() L = source.scatter().map(inc).gather().sink_to_list() @@ -218,9 +218,9 @@ assert source.loop == c.loop -def test_buffer_sync(loop): # noqa: F811 +def test_buffer_sync(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as c: # noqa: F841 + with Client(s['address']) as c: # noqa: F841 source = Stream() buff = source.scatter().map(slowinc, delay=0.5).buffer(5) L = buff.gather().sink_to_list() @@ -241,10 +241,11 @@ assert L == list(map(inc, range(10))) [email protected] @pytest.mark.xfail(reason='') -async def test_stream_shares_client_loop(loop): # noqa: F811 +async def test_stream_shares_client_loop(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 source = Stream() d = source.timed_window('20ms').scatter() # noqa: F841 assert source.loop is client.loop diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/tests/test_kafka.py new/streamz-0.6.6/streamz/tests/test_kafka.py --- old/streamz-0.6.4/streamz/tests/test_kafka.py 2022-07-27 20:06:05.000000000 +0200 +++ new/streamz-0.6.6/streamz/tests/test_kafka.py 2025-12-15 21:33:18.000000000 +0100 @@ -18,7 +18,8 @@ from distributed.utils_test import gen_cluster # flake8: noqa KAFKA_FILE = 'kafka_2.11-1.0.0' -LAUNCH_KAFKA = os.environ.get('STREAMZ_LAUNCH_KAFKA', '') == 'true' +if os.environ.get('STREAMZ_LAUNCH_KAFKA', '') != 'true': + pytest.skip("Not doing flaky kafka tests", allow_module_level=True) ck = pytest.importorskip('confluent_kafka') @@ -51,12 +52,12 @@ def launch_kafka(): stop_docker(let_fail=True) - subprocess.call(shlex.split("docker pull spotify/kafka")) + subprocess.call(shlex.split("docker pull spotify/kafka"), stderr=subprocess.DEVNULL) cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env " "ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 " "--name streamz-kafka spotify/kafka") - print(cmd) - cid = subprocess.check_output(shlex.split(cmd)).decode()[:-1] + cid = subprocess.check_output(shlex.split(cmd), + stderr=subprocess.DEVNULL).decode()[:-1] def end(): if cid: @@ -66,11 +67,11 @@ def predicate(): try: out = subprocess.check_output(['docker', 'logs', cid], - stderr=subprocess.STDOUT) - return b'kafka entered RUNNING state' in out + stderr=subprocess.STDOUT) + return b'RUNNING' in out except subprocess.CalledProcessError: pass - wait_for(predicate, 10, period=0.1) + wait_for(predicate, 45, period=0.1) return cid @@ -169,7 +170,7 @@ stream = Stream.from_kafka([TOPIC], ARGS) out = stream.sink_to_list() stream.start() - yield gen.sleep(1.1) + yield await_for(lambda: stream.started, 10, period=0.1) for i in range(10): yield gen.sleep(0.1) kafka.produce(TOPIC, b'value-%d' % i) @@ -182,14 +183,6 @@ kafka.flush() yield await_for(lambda: out[-1] == b'final message', 10, period=0.1) - stream._close_consumer() - kafka.produce(TOPIC, b'lost message') - kafka.flush() - # absolute sleep here, since we expect output list *not* to change - yield gen.sleep(1) - assert out[-1] == b'final message' - stream._close_consumer() - def test_kafka_batch(): j = random.randint(0, 10000) @@ -585,6 +578,8 @@ stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True) out1 = stream1.map(split).gather().sink_to_list() + time.sleep(1) # messages make ttheir way through kafka + stream1.start() wait_for(lambda: stream1.upstream.started, 10, period=0.1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/tests/test_sources.py new/streamz-0.6.6/streamz/tests/test_sources.py --- old/streamz-0.6.4/streamz/tests/test_sources.py 2022-07-27 20:06:05.000000000 +0200 +++ new/streamz-0.6.6/streamz/tests/test_sources.py 2026-04-07 16:10:13.000000000 +0200 @@ -4,7 +4,7 @@ from flaky import flaky import pytest from streamz import Source -from streamz.utils_test import wait_for, await_for, gen_test +from streamz.utils_test import free_port, wait_for, await_for, gen_test import socket @@ -48,6 +48,37 @@ @flaky(max_runs=3, min_passes=1) +def test_tcp_word_count_example(): + port = free_port() + s = Source.from_tcp(port) + out = s.map(bytes.split).flatten().frequencies().sink_to_list() + s.start() + wait_for(lambda: s.server is not None, 2, period=0.02) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect(("localhost", port)) + sock.send(b'data\n') + + with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock, + socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock2): + sock.connect(("localhost", port)) + sock2.connect(("localhost", port)) + sock.send(b'data\n') + # regression test a bug in from_tcp where a second packet from + # the same socket is dropped due to the socket handler dying + sock.send(b'data\n') + sock2.send(b'data2\n') + + expected = [{b"data": 1}, {b"data": 2}, {b"data": 3}, {b"data": 3, b"data2": 1}] + + def fail_func(): + assert out == expected + + wait_for(lambda: out == expected, 2, fail_func=fail_func, period=0.01) + + + +@flaky(max_runs=3, min_passes=1) @gen_test(timeout=60) def test_tcp_async(): port = 9876 @@ -103,7 +134,7 @@ def test_process(): cmd = ["python", "-c", "for i in range(4): print(i, end='')"] s = Source.from_process(cmd, with_end=True) - if sys.platform != "win32": + if sys.platform != "win32" and sys.version_info < (3, 14): # don't know why - something with pytest and new processes policy = asyncio.get_event_loop_policy() watcher = asyncio.SafeChildWatcher() @@ -119,7 +150,7 @@ def test_process_str(): cmd = 'python -c "for i in range(4): print(i)"' s = Source.from_process(cmd) - if sys.platform != "win32": + if sys.platform != "win32" and sys.version_info < (3, 14): # don't know why - something with pytest and new processes policy = asyncio.get_event_loop_policy() watcher = asyncio.SafeChildWatcher() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz/utils_test.py new/streamz-0.6.6/streamz/utils_test.py --- old/streamz-0.6.4/streamz/utils_test.py 2021-05-19 03:31:17.000000000 +0200 +++ new/streamz-0.6.6/streamz/utils_test.py 2026-04-07 16:10:13.000000000 +0200 @@ -1,9 +1,10 @@ import asyncio from contextlib import contextmanager +import io import logging import os -import six import shutil +import socket import tempfile from time import time, sleep @@ -14,6 +15,13 @@ from .core import _io_loops, Stream +def free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('localhost', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + @contextmanager def tmpfile(extension=''): extension = '.' + extension.lstrip('.') @@ -43,16 +51,11 @@ @contextmanager def pristine_loop(): - IOLoop.clear_instance() - IOLoop.clear_current() - loop = IOLoop() - loop.make_current() + loop = IOLoop(make_current=False) try: yield loop finally: loop.close(all_fds=True) - IOLoop.clear_instance() - IOLoop.clear_current() def gen_test(timeout=10): @@ -85,7 +88,7 @@ if propagate is not None: orig_propagate = logger.propagate logger.propagate = propagate - sio = six.StringIO() + sio = io.StringIO() logger.handlers[:] = [logging.StreamHandler(sio)] logger.setLevel(level) try: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz.egg-info/PKG-INFO new/streamz-0.6.6/streamz.egg-info/PKG-INFO --- old/streamz-0.6.4/streamz.egg-info/PKG-INFO 2022-07-27 20:07:32.000000000 +0200 +++ new/streamz-0.6.6/streamz.egg-info/PKG-INFO 2026-04-07 16:12:02.000000000 +0200 @@ -1,15 +1,27 @@ -Metadata-Version: 2.1 +Metadata-Version: 2.4 Name: streamz -Version: 0.6.4 +Version: 0.6.6 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin Maintainer-email: [email protected] License: BSD Keywords: streams -Platform: UNKNOWN -Requires-Python: >=3.7 +Requires-Python: >=3.10 License-File: LICENSE.txt +Requires-Dist: tornado +Requires-Dist: toolz +Requires-Dist: zict +Dynamic: description +Dynamic: home-page +Dynamic: keywords +Dynamic: license +Dynamic: license-file +Dynamic: maintainer +Dynamic: maintainer-email +Dynamic: requires-dist +Dynamic: requires-python +Dynamic: summary Streamz ======= @@ -36,5 +48,3 @@ :target: https://pypi.python.org/pypi/streamz/ .. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py - - diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz.egg-info/SOURCES.txt new/streamz-0.6.6/streamz.egg-info/SOURCES.txt --- old/streamz-0.6.4/streamz.egg-info/SOURCES.txt 2022-07-27 20:07:33.000000000 +0200 +++ new/streamz-0.6.6/streamz.egg-info/SOURCES.txt 2026-04-07 16:12:02.000000000 +0200 @@ -18,6 +18,7 @@ docs/source/index.rst docs/source/plotting.rst docs/source/plugins.rst +docs/source/use-cases.rst docs/source/images/complex.svg docs/source/images/cyclic.svg docs/source/images/inc-dec-add-print.svg diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.4/streamz.egg-info/requires.txt new/streamz-0.6.6/streamz.egg-info/requires.txt --- old/streamz-0.6.4/streamz.egg-info/requires.txt 2022-07-27 20:07:32.000000000 +0200 +++ new/streamz-0.6.6/streamz.egg-info/requires.txt 2026-04-07 16:12:02.000000000 +0200 @@ -1,5 +1,3 @@ tornado toolz zict -six -setuptools ++++++ streamz-opensuse-python-exec.patch ++++++ --- /var/tmp/diff_new_pack.9T0ulg/_old 2026-04-28 12:00:04.746756867 +0200 +++ /var/tmp/diff_new_pack.9T0ulg/_new 2026-04-28 12:00:04.750757035 +0200 @@ -1,23 +1,23 @@ -Index: streamz-0.6.3/streamz/tests/test_sources.py +Index: streamz-0.6.6/streamz/tests/test_sources.py =================================================================== ---- streamz-0.6.3.orig/streamz/tests/test_sources.py -+++ streamz-0.6.3/streamz/tests/test_sources.py -@@ -101,7 +101,7 @@ def test_http(): +--- streamz-0.6.6.orig/streamz/tests/test_sources.py ++++ streamz-0.6.6/streamz/tests/test_sources.py +@@ -132,7 +132,7 @@ def test_http(): @gen_test(timeout=60) def test_process(): - cmd = ["python", "-c", "for i in range(4): print(i, end='')"] + cmd = [sys.executable, "-c", "for i in range(4): print(i, end='')"] s = Source.from_process(cmd, with_end=True) - if sys.platform != "win32": + if sys.platform != "win32" and sys.version_info < (3, 14): # don't know why - something with pytest and new processes -@@ -117,7 +117,7 @@ def test_process(): +@@ -148,7 +148,7 @@ def test_process(): @gen_test(timeout=60) def test_process_str(): - cmd = 'python -c "for i in range(4): print(i)"' + cmd = f'{sys.executable} -c "for i in range(4): print(i)"' s = Source.from_process(cmd) - if sys.platform != "win32": + if sys.platform != "win32" and sys.version_info < (3, 14): # don't know why - something with pytest and new processes
