Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-streamz for openSUSE:Factory checked in at 2022-08-25 15:09:14 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-streamz (Old) and /work/SRC/openSUSE:Factory/.python-streamz.new.2083 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-streamz" Thu Aug 25 15:09:14 2022 rev:9 rq:999153 version:0.6.4 Changes: -------- --- /work/SRC/openSUSE:Factory/python-streamz/python-streamz.changes 2022-03-31 17:19:05.476999613 +0200 +++ /work/SRC/openSUSE:Factory/.python-streamz.new.2083/python-streamz.changes 2022-08-25 15:09:29.085264616 +0200 @@ -1,0 +2,12 @@ +Thu Aug 25 05:00:02 UTC 2022 - Steve Kowalik <steven.kowa...@suse.com> + +- Update to 0.6.4: + * No upstream changelog. +- Drop patch streamz-pr434-asyncdask.patch: + * Included upstream. +- Add patch support-new-distributed.patch: + * Also import cleanup when using loop, since distributed forces + calling it in your own context now. +- Ignore graph tests due to weird failures. + +------------------------------------------------------------------- Old: ---- streamz-0.6.3.tar.gz streamz-pr434-asyncdask.patch New: ---- streamz-0.6.4.tar.gz support-new-distributed.patch ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-streamz.spec ++++++ --- /var/tmp/diff_new_pack.obU3bd/_old 2022-08-25 15:09:29.913266365 +0200 +++ /var/tmp/diff_new_pack.obU3bd/_new 2022-08-25 15:09:29.917266374 +0200 @@ -19,17 +19,16 @@ %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 Name: python-streamz -Version: 0.6.3 +Version: 0.6.4 Release: 0 Summary: Tool to build continuous data pipelines License: BSD-3-Clause -Group: Development/Languages/Python URL: https://github.com/python-streamz/streamz/ Source: https://files.pythonhosted.org/packages/source/s/streamz/streamz-%{version}.tar.gz -# PATCH-FIX-UPSTREAM streamz-pr434-asyncdask.patch -- gh#python-streamz/streamz#434, gh#python-streamz/streamz#439 -Patch1: streamz-pr434-asyncdask.patch # PATCH-FIX-OPENSUSE streamz-opensuse-python-exec.patch -- call tests with correct flavor -Patch2: streamz-opensuse-python-exec.patch +Patch0: streamz-opensuse-python-exec.patch +# PATCH-FIX-OPENSUSE New distributed now requires to call cleanup with loop +Patch1: support-new-distributed.patch BuildRequires: %{python_module setuptools} BuildRequires: fdupes BuildRequires: python-rpm-macros @@ -92,7 +91,7 @@ fi # flaky: some tests are very fragile when run server-side donttest+=" or test_tcp" -%pytest -m "not network" --asyncio-mode=auto --force-flaky --max-runs=10 --no-success-flaky-report -rsfE ${$python_flags} -k "not ($donttest)" +%pytest -m "not network" --asyncio-mode=auto --force-flaky --max-runs=10 --no-success-flaky-report -rsfE ${$python_flags} --ignore streamz/tests/test_graph.py -k "not ($donttest)" %files %{python_files} %doc README.rst ++++++ streamz-0.6.3.tar.gz -> streamz-0.6.4.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/PKG-INFO new/streamz-0.6.4/PKG-INFO --- old/streamz-0.6.3/PKG-INFO 2021-10-04 16:47:26.663052300 +0200 +++ new/streamz-0.6.4/PKG-INFO 2022-07-27 20:07:33.221615000 +0200 @@ -1,37 +1,40 @@ -Metadata-Version: 1.2 +Metadata-Version: 2.1 Name: streamz -Version: 0.6.3 +Version: 0.6.4 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin Maintainer-email: mrock...@gmail.com License: BSD -Description: Streamz - ======= - - |Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI| - - Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on. - - Optionally, Streamz can also work with both `Pandas <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide sensible streaming operations on continuous tabular data. - - To learn more about how to use Streamz see documentation at `streamz.readthedocs.org <https://streamz.readthedocs.org>`_. - - LICENSE - ------- - - BSD-3 Clause - - .. |Build Status| image:: https://github.com/python-streamz/streamz/workflows/CI/badge.svg - :target: https://github.com/python-streamz/streamz/actions - .. |Doc Status| image:: http://readthedocs.org/projects/streamz/badge/?version=latest - :target: http://streamz.readthedocs.org/en/latest/ - :alt: Documentation Status - .. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg - :target: https://pypi.python.org/pypi/streamz/ - .. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green - :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py - Keywords: streams Platform: UNKNOWN -Requires-Python: >=3.6 +Requires-Python: >=3.7 +License-File: LICENSE.txt + +Streamz +======= + +|Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI| + +Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on. + +Optionally, Streamz can also work with both `Pandas <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide sensible streaming operations on continuous tabular data. + +To learn more about how to use Streamz see documentation at `streamz.readthedocs.org <https://streamz.readthedocs.org>`_. + +LICENSE +------- + +BSD-3 Clause + +.. |Build Status| image:: https://github.com/python-streamz/streamz/workflows/CI/badge.svg + :target: https://github.com/python-streamz/streamz/actions +.. |Doc Status| image:: http://readthedocs.org/projects/streamz/badge/?version=latest + :target: http://streamz.readthedocs.org/en/latest/ + :alt: Documentation Status +.. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg + :target: https://pypi.python.org/pypi/streamz/ +.. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green + :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py + + diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/docs/source/api.rst new/streamz-0.6.4/docs/source/api.rst --- old/streamz-0.6.3/docs/source/api.rst 2020-12-01 17:08:16.000000000 +0100 +++ new/streamz-0.6.4/docs/source/api.rst 2021-11-01 21:26:17.000000000 +0100 @@ -43,7 +43,11 @@ .. automethod:: Stream.emit .. automethod:: Stream.frequencies .. automethod:: Stream.register_api +.. automethod:: Stream.sink .. automethod:: Stream.sink_to_list +.. automethod:: Stream.sink_to_textfile +.. automethod:: Stream.to_websocket +.. automethod:: Stream.to_mqtt .. automethod:: Stream.update .. automethod:: Stream.visualize @@ -55,7 +59,9 @@ filenames from_kafka from_kafka_batched + from_mqtt from_process + from_websocket from_textfile from_tcp from_http_server diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/docs/source/core.rst new/streamz-0.6.4/docs/source/core.rst --- old/streamz-0.6.3/docs/source/core.rst 2020-09-25 16:18:17.000000000 +0200 +++ new/streamz-0.6.4/docs/source/core.rst 2021-12-30 22:18:19.000000000 +0100 @@ -297,7 +297,7 @@ example to this is ``sink``, which is intended to be used with side effects and will stick around even without a reference. -.. note:: Sink streams store themselves in ``streamz.core._global_sinks``. You +.. note:: Sink streams store themselves in ``streamz.sinks._global_sinks``. You can remove them permanently by clearing that collection. .. code-block:: python diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/setup.py new/streamz-0.6.4/setup.py --- old/streamz-0.6.3/setup.py 2021-10-04 16:46:46.000000000 +0200 +++ new/streamz-0.6.4/setup.py 2022-07-27 20:06:45.000000000 +0200 @@ -9,7 +9,7 @@ setup(name='streamz', - version='0.6.3', + version='0.6.4', description='Streams', url='http://github.com/python-streamz/streamz/', maintainer='Matthew Rocklin', @@ -17,7 +17,7 @@ license='BSD', keywords='streams', packages=packages + tests, - python_requires='>=3.6', + python_requires='>=3.7', long_description=(open('README.rst').read() if exists('README.rst') else ''), install_requires=list(open('requirements.txt').read().strip().split('\n')), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/__init__.py new/streamz-0.6.4/streamz/__init__.py --- old/streamz-0.6.3/streamz/__init__.py 2021-10-04 16:46:46.000000000 +0200 +++ new/streamz-0.6.4/streamz/__init__.py 2022-07-27 20:06:14.000000000 +0200 @@ -13,4 +13,4 @@ except ImportError: pass -__version__ = '0.6.3' +__version__ = '0.6.4' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/core.py new/streamz-0.6.4/streamz/core.py --- old/streamz-0.6.3/streamz/core.py 2021-09-11 20:05:05.000000000 +0200 +++ new/streamz-0.6.4/streamz/core.py 2021-12-30 22:18:19.000000000 +0100 @@ -1,3 +1,4 @@ +import asyncio from collections import deque, defaultdict from datetime import timedelta import functools @@ -485,15 +486,14 @@ finally: thread_state.asynchronous = ts_async else: - @gen.coroutine - def _(): + async def _(): thread_state.asynchronous = True try: - result = yield self._emit(x, metadata=metadata) + result = await asyncio.gather(*self._emit(x, metadata=metadata)) finally: del thread_state.asynchronous + return result - raise gen.Return(result) sync(self.loop, _) def update(self, x, who=None, metadata=None): @@ -1902,89 +1902,6 @@ yield self._emit(x, self.next_metadata) -@Stream.register_api() -class to_kafka(Stream): - """ Writes data in the stream to Kafka - - This stream accepts a string or bytes object. Call ``flush`` to ensure all - messages are pushed. Responses from Kafka are pushed downstream. - - Parameters - ---------- - topic : string - The topic which to write - producer_config : dict - Settings to set up the stream, see - https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration - https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - Examples: - bootstrap.servers: Connection string (host:port) to Kafka - - Examples - -------- - >>> from streamz import Stream - >>> ARGS = {'bootstrap.servers': 'localhost:9092'} - >>> source = Stream() - >>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS) - <to_kafka> - >>> for i in range(10): - ... source.emit(i) - >>> kafka.flush() - """ - def __init__(self, upstream, topic, producer_config, **kwargs): - import confluent_kafka as ck - - self.topic = topic - self.producer = ck.Producer(producer_config) - - kwargs["ensure_io_loop"] = True - Stream.__init__(self, upstream, **kwargs) - self.stopped = False - self.polltime = 0.2 - self.loop.add_callback(self.poll) - self.futures = [] - - @gen.coroutine - def poll(self): - while not self.stopped: - # executes callbacks for any delivered data, in this thread - # if no messages were sent, nothing happens - self.producer.poll(0) - yield gen.sleep(self.polltime) - - def update(self, x, who=None, metadata=None): - future = gen.Future() - self.futures.append(future) - - @gen.coroutine - def _(): - while True: - try: - # this runs asynchronously, in C-K's thread - self.producer.produce(self.topic, x, callback=self.cb) - return - except BufferError: - yield gen.sleep(self.polltime) - except Exception as e: - future.set_exception(e) - return - - self.loop.add_callback(_) - return future - - @gen.coroutine - def cb(self, err, msg): - future = self.futures.pop(0) - if msg is not None and msg.value() is not None: - future.set_result(None) - yield self._emit(msg.value()) - else: - future.set_exception(err or msg.error()) - - def flush(self, timeout=-1): - self.producer.flush(timeout) - - def sync(loop, func, *args, **kwargs): """ Run coroutine in loop running in separate thread. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/river.py new/streamz-0.6.4/streamz/river.py --- old/streamz-0.6.3/streamz/river.py 1970-01-01 01:00:00.000000000 +0100 +++ new/streamz-0.6.4/streamz/river.py 2021-12-30 22:18:19.000000000 +0100 @@ -0,0 +1,62 @@ +from . import Stream + + +# TODO: most river classes support batches, e.g., learn_many, more efficiently + + +class RiverTransform(Stream): + """Pass data through one or more River transforms""" + + def __init__(self, model, **kwargs): + super().__init__(**kwargs) + self.model = model + + def update(self, x, who=None, metadata=None): + out = self.model.transform_one(*x) + self.emit(out) + + +class RiverTrain(Stream): + + def __init__(self, model, metric=None, pass_model=False, **kwargs): + """ + + If metric and pass_model are both defaults, this is effectively + a sink. + + :param model: river model or pipeline + :param metric: river metric + If given, it is emitted on every sample + :param pass_model: bool + If True, the (updated) model if emitted for each sample + """ + super().__init__(**kwargs) + self.model = model + if pass_model and metric is not None: + raise TypeError + self.pass_model = pass_model + self.metric = metric + + def update(self, x, who=None, metadata=None): + """ + :param x: tuple + (x, [y[, w]) floats for single sample. Include + """ + self.model.learn_one(*x) + if self.metric: + yp = self.model.predict_one(x[0]) + weights = x[2] if len(x) > 1 else 1.0 + return self._emit(self.metric.update(x[1], yp, weights).get(), metadata=metadata) + if self.pass_model: + return self._emit(self.model, metadata=metadata) + + +class RiverPredict(Stream): + + def __init__(self, model, **kwargs): + super().__init__(**kwargs) + self.model = model + + def update(self, x, who=None, metadata=None): + out = self.model.predict_one(x) + return self._emit(out, metadata=metadata) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/sinks.py new/streamz-0.6.4/streamz/sinks.py --- old/streamz-0.6.3/streamz/sinks.py 2021-05-18 23:35:26.000000000 +0200 +++ new/streamz-0.6.4/streamz/sinks.py 2021-11-01 22:36:12.000000000 +0100 @@ -4,6 +4,7 @@ from tornado import gen from streamz import Stream +from streamz.core import sync # sinks add themselves here to avoid being garbage-collected _global_sinks = set() @@ -109,3 +110,164 @@ def update(self, x, who=None, metadata=None): self._fp.write(x + self._end) + + +@Stream.register_api() +class to_kafka(Stream): + """ Writes data in the stream to Kafka + + This stream accepts a string or bytes object. Call ``flush`` to ensure all + messages are pushed. Responses from Kafka are pushed downstream. + + Parameters + ---------- + topic : string + The topic which to write + producer_config : dict + Settings to set up the stream, see + https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + Examples: + bootstrap.servers: Connection string (host:port) to Kafka + + Examples + -------- + >>> from streamz import Stream + >>> ARGS = {'bootstrap.servers': 'localhost:9092'} + >>> source = Stream() + >>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS) + <to_kafka> + >>> for i in range(10): + ... source.emit(i) + >>> kafka.flush() + """ + def __init__(self, upstream, topic, producer_config, **kwargs): + import confluent_kafka as ck + + self.topic = topic + self.producer = ck.Producer(producer_config) + + kwargs["ensure_io_loop"] = True + Stream.__init__(self, upstream, **kwargs) + self.stopped = False + self.polltime = 0.2 + self.loop.add_callback(self.poll) + self.futures = [] + + @gen.coroutine + def poll(self): + while not self.stopped: + # executes callbacks for any delivered data, in this thread + # if no messages were sent, nothing happens + self.producer.poll(0) + yield gen.sleep(self.polltime) + + def update(self, x, who=None, metadata=None): + future = gen.Future() + self.futures.append(future) + + @gen.coroutine + def _(): + while True: + try: + # this runs asynchronously, in C-K's thread + self.producer.produce(self.topic, x, callback=self.cb) + return + except BufferError: + yield gen.sleep(self.polltime) + except Exception as e: + future.set_exception(e) + return + + self.loop.add_callback(_) + return future + + @gen.coroutine + def cb(self, err, msg): + future = self.futures.pop(0) + if msg is not None and msg.value() is not None: + future.set_result(None) + yield self._emit(msg.value()) + else: + future.set_exception(err or msg.error()) + + def flush(self, timeout=-1): + self.producer.flush(timeout) + + +@Stream.register_api() +class to_websocket(Sink): + """Write bytes data to websocket + + The websocket will be opened on first call, and kept open. Should + it close at some point, future writes will fail. + + Requires the ``websockets`` package. + + :param uri: str + Something like "ws://host:port". Use "wss:" to allow TLS. + :param ws_kwargs: dict + Further kwargs to pass to ``websockets.connect``, please + read its documentation. + :param kwargs: + Passed to superclass + """ + + def __init__(self, upstream, uri, ws_kwargs=None, **kwargs): + self.uri = uri + self.ws_kw = ws_kwargs + self.ws = None + super().__init__(upstream, ensure_io_loop=True, **kwargs) + + async def update(self, x, who=None, metadata=None): + import websockets + if self.ws is None: + self.ws = await websockets.connect(self.uri, **(self.ws_kw or {})) + await self.ws.send(x) + + def destroy(self): + super().destroy() + if self.ws is not None: + sync(self.loop, self.ws.protocol.close) + self.ws = None + + +@Stream.register_api() +class to_mqtt(Sink): + """ + Send data to MQTT broker + + See also ``sources.from_mqtt``. + + Requires ``paho.mqtt`` + + :param host: str + :param port: int + :param topic: str + :param keepalive: int + See mqtt docs - to keep the channel alive + :param client_kwargs: + Passed to the client's ``connect()`` method + """ + def __init__(self, upstream, host, port, topic, keepalive=60, client_kwargs=None, + **kwargs): + self.host = host + self.port = port + self.c_kw = client_kwargs or {} + self.client = None + self.topic = topic + self.keepalive = keepalive + super().__init__(upstream, ensure_io_loop=True, **kwargs) + + def update(self, x, who=None, metadata=None): + import paho.mqtt.client as mqtt + if self.client is None: + self.client = mqtt.Client() + self.client.connect(self.host, self.port, self.keepalive, **self.c_kw) + # TODO: wait on successful delivery + self.client.publish(self.topic, x) + + def destroy(self): + self.client.disconnect() + self.client = None + super().destroy() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/sources.py new/streamz-0.6.4/streamz/sources.py --- old/streamz-0.6.3/streamz/sources.py 2021-05-18 23:35:26.000000000 +0200 +++ new/streamz-0.6.4/streamz/sources.py 2021-11-02 14:02:22.000000000 +0100 @@ -1,11 +1,12 @@ import asyncio from glob import glob +import queue import os import time from tornado import gen import weakref -from .core import Stream, convert_interval, RefCounter +from .core import Stream, convert_interval, RefCounter, sync def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False): @@ -448,10 +449,14 @@ self.stopped = False self.consumer = ck.Consumer(self.cpars) self.consumer.subscribe(self.topics) - weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer)) + weakref.finalize( + self, lambda consumer=self.consumer: _close_consumer(consumer) + ) tp = ck.TopicPartition(self.topics[0], 0, 0) - # blocks for consumer thread to come up + # blocks for consumer thread to come up and invoke poll to + # establish connection with broker to fetch oauth token for kafka + self.consumer.poll(timeout=1) self.consumer.get_watermark_offsets(tp) self.loop.add_callback(self.poll_kafka) @@ -478,7 +483,8 @@ max_batch_size=10000, keys=False, engine=None, **kwargs): self.consumer_params = consumer_params - # Override the auto-commit config to enforce custom streamz checkpointing + # Override the auto-commit config to enforce custom streamz + # checkpointing self.consumer_params['enable.auto.commit'] = 'false' if 'auto.offset.reset' not in self.consumer_params.keys(): consumer_params['auto.offset.reset'] = 'latest' @@ -586,7 +592,9 @@ self.stopped = False tp = ck.TopicPartition(self.topic, 0, 0) - # blocks for consumer thread to come up + # blocks for consumer thread to come up and invoke poll to establish + # connection with broker to fetch oauth token for kafka + self.consumer.poll(timeout=1) self.consumer.get_watermark_offsets(tp) self.loop.add_callback(self.poll_kafka) @@ -779,3 +787,135 @@ break await asyncio.gather(*self._emit(x)) self.stopped = True + + +@Stream.register_api() +class from_websocket(Source): + """Read binary data from a websocket + + This source will accept connections on a given port and handle messages + coming in. + + The websockets library must be installed. + + :param host: str + Typically "localhost" + :param port: int + Which port to listen on (must be available) + :param serve_kwargs: dict + Passed to ``websockets.serve`` + :param kwargs: + Passed to superclass + """ + + def __init__(self, host, port, serve_kwargs=None, **kwargs): + self.host = host + self.port = port + self.s_kw = serve_kwargs + self.server = None + super().__init__(**kwargs) + + @gen.coroutine + def _read(self, ws, path): + while not self.stopped: + data = yield ws.recv() + yield self._emit(data) + + async def run(self): + import websockets + self.server = await websockets.serve( + self._read, self.host, self.port, **(self.s_kw or {}) + ) + + def stop(self): + self.server.close() + sync(self.loop, self.server.wait_closed) + + +@Stream.register_api() +class from_q(Source): + """Source events from a threading.Queue, running another event framework + + The queue is polled, i.e., there is a latency/overhead tradeoff, since + we cannot use ``await`` directly with a multithreaded queue. + + Allows mixing of another event loop, for example pyqt, on another thread. + Note that, by default, a streamz.Source such as this one will start + an event loop in a new thread, unless otherwise specified. + """ + + def __init__(self, q, sleep_time=0.01, **kwargs): + """ + :param q: threading.Queue + Any items pushed into here will become streamz events + :param sleep_time: int + Sets how long we wait before checking the input queue when + empty (in s) + :param kwargs: + passed to streamz.Source + """ + self.q = q + self.sleep = sleep_time + super().__init__(**kwargs) + + async def _run(self): + """Poll threading queue for events + This uses check-and-wait, but overhead is low. Could maybe have + a sleep-free version with an threading.Event. + """ + try: + out = self.q.get_nowait() + await self.emit(out, asynchronous=True) + except queue.Empty: + await asyncio.sleep(self.sleep) + + +@Stream.register_api() +class from_mqtt(from_q): + """Read from MQTT source + + See https://en.wikipedia.org/wiki/MQTT for a description of the protocol + and its uses. + + See also ``sinks.to_mqtt``. + + Requires ``paho.mqtt`` + + The outputs are ``paho.mqtt.client.MQTTMessage`` instances, which each have + attributes timestamp, payload, topic, ... + + NB: paho.mqtt.python runs on its own thread in this implementation. We may + wish to instead call client.loop() directly + + :param host: str + :param port: int + :param topic: str + (May in the future support a list of topics) + :param keepalive: int + See mqtt docs - to keep the channel alive + :param client_kwargs: + Passed to the client's ``connect()`` method + """ + def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs): + self.host = host + self.port = port + self.keepalive = keepalive + self.topic = topic + self.client_kwargs = client_kwargs + super().__init__(q=queue.Queue(), **kwargs) + + def _on_connect(self, client, userdata, flags, rc): + client.subscribe(self.topic) + + def _on_message(self, client, userdata, msg): + self.q.put(msg) + + async def run(self): + import paho.mqtt.client as mqtt + client = mqtt.Client() + client.on_connect = self._on_connect + client.on_message = self._on_message + client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {})) + client.loop_start() + await super().run() + client.disconnect() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_dask.py new/streamz-0.6.4/streamz/tests/test_dask.py --- old/streamz-0.6.3/streamz/tests/test_dask.py 2021-05-18 23:35:26.000000000 +0200 +++ new/streamz-0.6.4/streamz/tests/test_dask.py 2021-12-30 22:18:19.000000000 +0100 @@ -1,3 +1,4 @@ +import asyncio from operator import add import random import time @@ -16,21 +17,21 @@ @gen_cluster(client=True) -def test_map(c, s, a, b): +async def test_map(c, s, a, b): source = Stream(asynchronous=True) futures = scatter(source).map(inc) futures_L = futures.sink_to_list() L = futures.gather().sink_to_list() for i in range(5): - yield source.emit(i) + await source.emit(i) assert L == [1, 2, 3, 4, 5] assert all(isinstance(f, Future) for f in futures_L) @gen_cluster(client=True) -def test_map_on_dict(c, s, a, b): +async def test_map_on_dict(c, s, a, b): # dask treats dicts differently, so we have to make sure # the user sees no difference in the streamz api. # Regression test against #336 @@ -43,7 +44,7 @@ L = futures.gather().sink_to_list() for i in range(5): - yield source.emit({"i": i}) + await source.emit({"i": i}) assert len(L) == 5 for i, item in enumerate(sorted(L, key=lambda x: x["x"])): @@ -52,7 +53,7 @@ @gen_cluster(client=True) -def test_partition_then_scatter_async(c, s, a, b): +async def test_partition_then_scatter_async(c, s, a, b): # Ensure partition w/ timeout before scatter works correctly for # asynchronous start = time.monotonic() @@ -63,10 +64,10 @@ rc = RefCounter(loop=source.loop) for i in range(3): - yield source.emit(i, metadata=[{'ref': rc}]) + await source.emit(i, metadata=[{'ref': rc}]) while rc.count != 0 and time.monotonic() - start < 1.: - yield gen.sleep(1e-2) + await gen.sleep(1e-2) assert L == [1, 2, 3] @@ -92,7 +93,7 @@ @gen_cluster(client=True) -def test_non_unique_emit(c, s, a, b): +async def test_non_unique_emit(c, s, a, b): """Regression for https://github.com/python-streamz/streams/issues/397 Non-unique stream entries still need to each be processed. @@ -103,28 +104,28 @@ for _ in range(3): # Emit non-unique values - yield source.emit(0) + await source.emit(0) assert len(L) == 3 assert L[0] != L[1] or L[0] != L[2] @gen_cluster(client=True) -def test_scan(c, s, a, b): +async def test_scan(c, s, a, b): source = Stream(asynchronous=True) futures = scatter(source).map(inc).scan(add) futures_L = futures.sink_to_list() L = futures.gather().sink_to_list() for i in range(5): - yield source.emit(i) + await source.emit(i) assert L == [1, 3, 6, 10, 15] assert all(isinstance(f, Future) for f in futures_L) @gen_cluster(client=True) -def test_scan_state(c, s, a, b): +async def test_scan_state(c, s, a, b): source = Stream(asynchronous=True) def f(acc, i): @@ -133,33 +134,33 @@ L = scatter(source).scan(f, returns_state=True).gather().sink_to_list() for i in range(3): - yield source.emit(i) + await source.emit(i) assert L == [0, 1, 3] @gen_cluster(client=True) -def test_zip(c, s, a, b): +async def test_zip(c, s, a, b): a = Stream(asynchronous=True) b = Stream(asynchronous=True) c = scatter(a).zip(scatter(b)) L = c.gather().sink_to_list() - yield a.emit(1) - yield b.emit('a') - yield a.emit(2) - yield b.emit('b') + await a.emit(1) + await b.emit('a') + await a.emit(2) + await b.emit('b') assert L == [(1, 'a'), (2, 'b')] @gen_cluster(client=True) -def test_accumulate(c, s, a, b): +async def test_accumulate(c, s, a, b): source = Stream(asynchronous=True) L = source.scatter().accumulate(lambda acc, x: acc + x, with_state=True).gather().sink_to_list() for i in range(3): - yield source.emit(i) + await source.emit(i) assert L[-1][1] == 3 @@ -169,10 +170,9 @@ source = Stream() L = source.scatter().map(inc).gather().sink_to_list() - @gen.coroutine - def f(): + async def f(): for i in range(10): - yield source.emit(i, asynchronous=True) + await source.emit(i, asynchronous=True) sync(loop, f) @@ -193,24 +193,24 @@ @gen_cluster(client=True, nthreads=[('127.0.0.1', 1)] * 2) -def test_buffer(c, s, a, b): +async def test_buffer(c, s, a, b): source = Stream(asynchronous=True) L = source.scatter().map(slowinc, delay=0.5).buffer(5).gather().sink_to_list() start = time.time() for i in range(5): - yield source.emit(i) + await source.emit(i) end = time.time() assert end - start < 0.5 for i in range(5, 10): - yield source.emit(i) + await source.emit(i) end2 = time.time() assert end2 - start > (0.5 / 3) while len(L) < 10: - yield gen.sleep(0.01) + await gen.sleep(0.01) assert time.time() - start < 5 assert L == list(map(inc, range(10))) @@ -242,7 +242,7 @@ @pytest.mark.xfail(reason='') -def test_stream_shares_client_loop(loop): # noqa: F811 +async def test_stream_shares_client_loop(loop): # noqa: F811 with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: # noqa: F841 source = Stream() @@ -251,7 +251,7 @@ @gen_cluster(client=True) -def test_starmap(c, s, a, b): +async def test_starmap(c, s, a, b): def add(x, y, z=0): return x + y + z @@ -259,6 +259,6 @@ L = source.scatter().starmap(add, z=10).gather().sink_to_list() for i in range(5): - yield source.emit((i, i)) + await source.emit((i, i)) assert L == [10, 12, 14, 16, 18] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_kafka.py new/streamz-0.6.4/streamz/tests/test_kafka.py --- old/streamz-0.6.3/streamz/tests/test_kafka.py 2021-05-18 23:35:26.000000000 +0200 +++ new/streamz-0.6.4/streamz/tests/test_kafka.py 2022-07-27 20:06:05.000000000 +0200 @@ -1,3 +1,4 @@ +import asyncio import atexit from contextlib import contextmanager from flaky import flaky @@ -217,7 +218,7 @@ @gen_cluster(client=True, timeout=60) -def test_kafka_dask_batch(c, s, w1, w2): +async def test_kafka_dask_batch(c, s, w1, w2): j = random.randint(0, 10000) ARGS = {'bootstrap.servers': 'localhost:9092', 'group.id': 'streamz-test%i' % j} @@ -227,15 +228,15 @@ asynchronous=True, dask=True) out = stream.gather().sink_to_list() stream.start() - yield gen.sleep(5) # this frees the loop while dask workers report in + await asyncio.sleep(5) # this frees the loop while dask workers report in assert isinstance(stream, DaskStream) for i in range(10): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() - yield await_for(lambda: any(out), 10, period=0.2) + await await_for(lambda: any(out), 10, period=0.2) assert {'key': None, 'value': b'value-1'} in out[0] stream.stop() - yield gen.sleep(0) + await asyncio.sleep(0) stream.upstream.upstream.consumer.close() @@ -382,7 +383,7 @@ @gen_cluster(client=True, timeout=60) -def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2): +async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2): ''' Testing whether Dask's scatter and gather works in conformity with the reference counting checkpointing implementation. @@ -403,23 +404,23 @@ kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() stream1 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True, - dask=True) + dask=True) out1 = stream1.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream1.start() - yield await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2) + await await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2) stream1.upstream.stopped = True stream2 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True, - dask=True) + dask=True) out2 = stream2.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream2.start() time.sleep(5) assert len(out2) == 0 stream2.upstream.stopped = True stream3 = Stream.from_kafka_batched(TOPIC, ARGS2, asynchronous=True, - dask=True) + dask=True) out3 = stream3.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream3.start() - yield await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2) + await await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2) stream3.upstream.stopped = True diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_sinks.py new/streamz-0.6.4/streamz/tests/test_sinks.py --- old/streamz-0.6.3/streamz/tests/test_sinks.py 2021-05-18 23:35:26.000000000 +0200 +++ new/streamz-0.6.4/streamz/tests/test_sinks.py 2022-07-27 20:06:05.000000000 +0200 @@ -3,7 +3,7 @@ import pytest from streamz import Stream from streamz.sinks import _global_sinks, Sink -from streamz.utils_test import tmpfile +from streamz.utils_test import tmpfile, wait_for def test_sink_with_args_and_kwargs(): @@ -71,3 +71,35 @@ del sink assert ref() is None + + +def test_ws_roundtrip(): + pytest.importorskip("websockets") + s0 = Stream.from_websocket("localhost", 8989, start=True) + l = s0.sink_to_list() + + data = [b'0123'] * 4 + s = Stream.from_iterable(data) + s.to_websocket("ws://localhost:8989") + s.start() + + wait_for(lambda: data == l, timeout=1) + s.stop() + s0.stop() + + +def test_mqtt_roundtrip(): + pytest.importorskip("paho.mqtt.client") + s0 = Stream.from_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature") + l = s0.map(lambda msg: msg.payload).sink_to_list() + s0.start() + + data = [b'0123'] * 4 + s = Stream.from_iterable(data) + s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature") + s.start() + + wait_for(lambda: data == l, timeout=1) + s.stop() + s0.stop() + diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz.egg-info/PKG-INFO new/streamz-0.6.4/streamz.egg-info/PKG-INFO --- old/streamz-0.6.3/streamz.egg-info/PKG-INFO 2021-10-04 16:47:26.000000000 +0200 +++ new/streamz-0.6.4/streamz.egg-info/PKG-INFO 2022-07-27 20:07:32.000000000 +0200 @@ -1,37 +1,40 @@ -Metadata-Version: 1.2 +Metadata-Version: 2.1 Name: streamz -Version: 0.6.3 +Version: 0.6.4 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin Maintainer-email: mrock...@gmail.com License: BSD -Description: Streamz - ======= - - |Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI| - - Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on. - - Optionally, Streamz can also work with both `Pandas <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide sensible streaming operations on continuous tabular data. - - To learn more about how to use Streamz see documentation at `streamz.readthedocs.org <https://streamz.readthedocs.org>`_. - - LICENSE - ------- - - BSD-3 Clause - - .. |Build Status| image:: https://github.com/python-streamz/streamz/workflows/CI/badge.svg - :target: https://github.com/python-streamz/streamz/actions - .. |Doc Status| image:: http://readthedocs.org/projects/streamz/badge/?version=latest - :target: http://streamz.readthedocs.org/en/latest/ - :alt: Documentation Status - .. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg - :target: https://pypi.python.org/pypi/streamz/ - .. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green - :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py - Keywords: streams Platform: UNKNOWN -Requires-Python: >=3.6 +Requires-Python: >=3.7 +License-File: LICENSE.txt + +Streamz +======= + +|Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI| + +Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on. + +Optionally, Streamz can also work with both `Pandas <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide sensible streaming operations on continuous tabular data. + +To learn more about how to use Streamz see documentation at `streamz.readthedocs.org <https://streamz.readthedocs.org>`_. + +LICENSE +------- + +BSD-3 Clause + +.. |Build Status| image:: https://github.com/python-streamz/streamz/workflows/CI/badge.svg + :target: https://github.com/python-streamz/streamz/actions +.. |Doc Status| image:: http://readthedocs.org/projects/streamz/badge/?version=latest + :target: http://streamz.readthedocs.org/en/latest/ + :alt: Documentation Status +.. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg + :target: https://pypi.python.org/pypi/streamz/ +.. |RAPIDS custreamz gpuCI| image:: https://img.shields.io/badge/gpuCI-custreamz-green + :target: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py + + diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.6.3/streamz.egg-info/SOURCES.txt new/streamz-0.6.4/streamz.egg-info/SOURCES.txt --- old/streamz-0.6.3/streamz.egg-info/SOURCES.txt 2021-10-04 16:47:26.000000000 +0200 +++ new/streamz-0.6.4/streamz.egg-info/SOURCES.txt 2022-07-27 20:07:33.000000000 +0200 @@ -31,6 +31,7 @@ streamz/graph.py streamz/orderedweakset.py streamz/plugins.py +streamz/river.py streamz/sinks.py streamz/sources.py streamz/utils.py ++++++ support-new-distributed.patch ++++++ Index: streamz-0.6.4/streamz/tests/test_dask.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/test_dask.py +++ streamz-0.6.4/streamz/tests/test_dask.py @@ -13,7 +13,7 @@ from streamz import RefCounter, Stream from distributed import Future, Client from distributed.utils import sync -from distributed.utils_test import gen_cluster, inc, cluster, loop, slowinc # noqa: F401 +from distributed.utils_test import cleanup, gen_cluster, inc, cluster, loop, slowinc # noqa: F401 @gen_cluster(client=True) Index: streamz-0.6.4/streamz/tests/test_core.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/test_core.py +++ streamz-0.6.4/streamz/tests/test_core.py @@ -19,7 +19,7 @@ from streamz import RefCounter from streamz.sources import sink_to_file from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401 clean, await_for, metadata, wait_for) # noqa: F401 -from distributed.utils_test import loop # noqa: F401 +from distributed.utils_test import cleanup, loop # noqa: F401 def test_basic():