Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-streamz for openSUSE:Factory 
checked in at 2022-08-25 15:09:14
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-streamz (Old)
 and      /work/SRC/openSUSE:Factory/.python-streamz.new.2083 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-streamz"

Thu Aug 25 15:09:14 2022 rev:9 rq:999153 version:0.6.4

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-streamz/python-streamz.changes    
2022-03-31 17:19:05.476999613 +0200
+++ /work/SRC/openSUSE:Factory/.python-streamz.new.2083/python-streamz.changes  
2022-08-25 15:09:29.085264616 +0200
@@ -1,0 +2,12 @@
+Thu Aug 25 05:00:02 UTC 2022 - Steve Kowalik <steven.kowa...@suse.com>
+
+- Update to 0.6.4:
+  * No upstream changelog.
+- Drop patch streamz-pr434-asyncdask.patch:
+  * Included upstream.
+- Add patch support-new-distributed.patch:
+  * Also import cleanup when using loop, since distributed forces
+    calling it in your own context now.
+- Ignore graph tests due to weird failures.
+
+-------------------------------------------------------------------

Old:
----
  streamz-0.6.3.tar.gz
  streamz-pr434-asyncdask.patch

New:
----
  streamz-0.6.4.tar.gz
  support-new-distributed.patch

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-streamz.spec ++++++
--- /var/tmp/diff_new_pack.obU3bd/_old  2022-08-25 15:09:29.913266365 +0200
+++ /var/tmp/diff_new_pack.obU3bd/_new  2022-08-25 15:09:29.917266374 +0200
@@ -19,17 +19,16 @@
 %{?!python_module:%define python_module() python3-%{**}}
 %define         skip_python2 1
 Name:           python-streamz
-Version:        0.6.3
+Version:        0.6.4
 Release:        0
 Summary:        Tool to build continuous data pipelines
 License:        BSD-3-Clause
-Group:          Development/Languages/Python
 URL:            https://github.com/python-streamz/streamz/
 Source:         
https://files.pythonhosted.org/packages/source/s/streamz/streamz-%{version}.tar.gz
-# PATCH-FIX-UPSTREAM streamz-pr434-asyncdask.patch -- 
gh#python-streamz/streamz#434, gh#python-streamz/streamz#439
-Patch1:         streamz-pr434-asyncdask.patch
 # PATCH-FIX-OPENSUSE streamz-opensuse-python-exec.patch -- call tests with 
correct flavor
-Patch2:         streamz-opensuse-python-exec.patch
+Patch0:         streamz-opensuse-python-exec.patch
+# PATCH-FIX-OPENSUSE New distributed now requires to call cleanup with loop
+Patch1:         support-new-distributed.patch
 BuildRequires:  %{python_module setuptools}
 BuildRequires:  fdupes
 BuildRequires:  python-rpm-macros
@@ -92,7 +91,7 @@
 fi
 # flaky: some tests are very fragile when run server-side
 donttest+=" or test_tcp"
-%pytest -m "not network" --asyncio-mode=auto --force-flaky --max-runs=10 
--no-success-flaky-report -rsfE ${$python_flags} -k "not ($donttest)"
+%pytest -m "not network" --asyncio-mode=auto --force-flaky --max-runs=10 
--no-success-flaky-report -rsfE ${$python_flags} --ignore 
streamz/tests/test_graph.py -k "not ($donttest)"
 
 %files %{python_files}
 %doc README.rst

++++++ streamz-0.6.3.tar.gz -> streamz-0.6.4.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/PKG-INFO new/streamz-0.6.4/PKG-INFO
--- old/streamz-0.6.3/PKG-INFO  2021-10-04 16:47:26.663052300 +0200
+++ new/streamz-0.6.4/PKG-INFO  2022-07-27 20:07:33.221615000 +0200
@@ -1,37 +1,40 @@
-Metadata-Version: 1.2
+Metadata-Version: 2.1
 Name: streamz
-Version: 0.6.3
+Version: 0.6.4
 Summary: Streams
 Home-page: http://github.com/python-streamz/streamz/
 Maintainer: Matthew Rocklin
 Maintainer-email: mrock...@gmail.com
 License: BSD
-Description: Streamz
-        =======
-        
-        |Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI|
-        
-        Streamz helps you build pipelines to manage continuous streams of 
data. It is simple to use in simple cases, but also supports complex pipelines 
that involve branching, joining, flow control, feedback, back pressure, and so 
on.
-        
-        Optionally, Streamz can also work with both `Pandas 
<https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
 and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide 
sensible streaming operations on continuous tabular data.
-        
-        To learn more about how to use Streamz see documentation at 
`streamz.readthedocs.org <https://streamz.readthedocs.org>`_.
-        
-        LICENSE
-        -------
-        
-        BSD-3 Clause
-        
-        .. |Build Status| image:: 
https://github.com/python-streamz/streamz/workflows/CI/badge.svg
-           :target: https://github.com/python-streamz/streamz/actions
-        .. |Doc Status| image:: 
http://readthedocs.org/projects/streamz/badge/?version=latest
-           :target: http://streamz.readthedocs.org/en/latest/
-           :alt: Documentation Status
-        .. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg
-           :target: https://pypi.python.org/pypi/streamz/
-        .. |RAPIDS custreamz gpuCI| image:: 
https://img.shields.io/badge/gpuCI-custreamz-green
-           :target: 
https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py
-        
 Keywords: streams
 Platform: UNKNOWN
-Requires-Python: >=3.6
+Requires-Python: >=3.7
+License-File: LICENSE.txt
+
+Streamz
+=======
+
+|Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI|
+
+Streamz helps you build pipelines to manage continuous streams of data. It is 
simple to use in simple cases, but also supports complex pipelines that involve 
branching, joining, flow control, feedback, back pressure, and so on.
+
+Optionally, Streamz can also work with both `Pandas 
<https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
 and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide 
sensible streaming operations on continuous tabular data.
+
+To learn more about how to use Streamz see documentation at 
`streamz.readthedocs.org <https://streamz.readthedocs.org>`_.
+
+LICENSE
+-------
+
+BSD-3 Clause
+
+.. |Build Status| image:: 
https://github.com/python-streamz/streamz/workflows/CI/badge.svg
+   :target: https://github.com/python-streamz/streamz/actions
+.. |Doc Status| image:: 
http://readthedocs.org/projects/streamz/badge/?version=latest
+   :target: http://streamz.readthedocs.org/en/latest/
+   :alt: Documentation Status
+.. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg
+   :target: https://pypi.python.org/pypi/streamz/
+.. |RAPIDS custreamz gpuCI| image:: 
https://img.shields.io/badge/gpuCI-custreamz-green
+   :target: 
https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py
+
+
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/docs/source/api.rst 
new/streamz-0.6.4/docs/source/api.rst
--- old/streamz-0.6.3/docs/source/api.rst       2020-12-01 17:08:16.000000000 
+0100
+++ new/streamz-0.6.4/docs/source/api.rst       2021-11-01 21:26:17.000000000 
+0100
@@ -43,7 +43,11 @@
 .. automethod:: Stream.emit
 .. automethod:: Stream.frequencies
 .. automethod:: Stream.register_api
+.. automethod:: Stream.sink
 .. automethod:: Stream.sink_to_list
+.. automethod:: Stream.sink_to_textfile
+.. automethod:: Stream.to_websocket
+.. automethod:: Stream.to_mqtt
 .. automethod:: Stream.update
 .. automethod:: Stream.visualize
 
@@ -55,7 +59,9 @@
    filenames
    from_kafka
    from_kafka_batched
+   from_mqtt
    from_process
+   from_websocket
    from_textfile
    from_tcp
    from_http_server
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/docs/source/core.rst 
new/streamz-0.6.4/docs/source/core.rst
--- old/streamz-0.6.3/docs/source/core.rst      2020-09-25 16:18:17.000000000 
+0200
+++ new/streamz-0.6.4/docs/source/core.rst      2021-12-30 22:18:19.000000000 
+0100
@@ -297,7 +297,7 @@
 example to this is ``sink``, which is intended to be used with side effects and
 will stick around even without a reference.
 
-.. note:: Sink streams store themselves in ``streamz.core._global_sinks``.  You
+.. note:: Sink streams store themselves in ``streamz.sinks._global_sinks``.  
You
           can remove them permanently by clearing that collection.
 
 .. code-block:: python
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/setup.py new/streamz-0.6.4/setup.py
--- old/streamz-0.6.3/setup.py  2021-10-04 16:46:46.000000000 +0200
+++ new/streamz-0.6.4/setup.py  2022-07-27 20:06:45.000000000 +0200
@@ -9,7 +9,7 @@
 
 
 setup(name='streamz',
-      version='0.6.3',
+      version='0.6.4',
       description='Streams',
       url='http://github.com/python-streamz/streamz/',
       maintainer='Matthew Rocklin',
@@ -17,7 +17,7 @@
       license='BSD',
       keywords='streams',
       packages=packages + tests,
-      python_requires='>=3.6',
+      python_requires='>=3.7',
       long_description=(open('README.rst').read() if exists('README.rst')
                         else ''),
       
install_requires=list(open('requirements.txt').read().strip().split('\n')),
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/__init__.py 
new/streamz-0.6.4/streamz/__init__.py
--- old/streamz-0.6.3/streamz/__init__.py       2021-10-04 16:46:46.000000000 
+0200
+++ new/streamz-0.6.4/streamz/__init__.py       2022-07-27 20:06:14.000000000 
+0200
@@ -13,4 +13,4 @@
 except ImportError:
     pass
 
-__version__ = '0.6.3'
+__version__ = '0.6.4'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/core.py 
new/streamz-0.6.4/streamz/core.py
--- old/streamz-0.6.3/streamz/core.py   2021-09-11 20:05:05.000000000 +0200
+++ new/streamz-0.6.4/streamz/core.py   2021-12-30 22:18:19.000000000 +0100
@@ -1,3 +1,4 @@
+import asyncio
 from collections import deque, defaultdict
 from datetime import timedelta
 import functools
@@ -485,15 +486,14 @@
             finally:
                 thread_state.asynchronous = ts_async
         else:
-            @gen.coroutine
-            def _():
+            async def _():
                 thread_state.asynchronous = True
                 try:
-                    result = yield self._emit(x, metadata=metadata)
+                    result = await asyncio.gather(*self._emit(x, 
metadata=metadata))
                 finally:
                     del thread_state.asynchronous
+                return result
 
-                raise gen.Return(result)
             sync(self.loop, _)
 
     def update(self, x, who=None, metadata=None):
@@ -1902,89 +1902,6 @@
             yield self._emit(x, self.next_metadata)
 
 
-@Stream.register_api()
-class to_kafka(Stream):
-    """ Writes data in the stream to Kafka
-
-    This stream accepts a string or bytes object. Call ``flush`` to ensure all
-    messages are pushed. Responses from Kafka are pushed downstream.
-
-    Parameters
-    ----------
-    topic : string
-        The topic which to write
-    producer_config : dict
-        Settings to set up the stream, see
-        
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
-        https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
-        Examples:
-        bootstrap.servers: Connection string (host:port) to Kafka
-
-    Examples
-    --------
-    >>> from streamz import Stream
-    >>> ARGS = {'bootstrap.servers': 'localhost:9092'}
-    >>> source = Stream()
-    >>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
-    <to_kafka>
-    >>> for i in range(10):
-    ...     source.emit(i)
-    >>> kafka.flush()
-    """
-    def __init__(self, upstream, topic, producer_config, **kwargs):
-        import confluent_kafka as ck
-
-        self.topic = topic
-        self.producer = ck.Producer(producer_config)
-
-        kwargs["ensure_io_loop"] = True
-        Stream.__init__(self, upstream, **kwargs)
-        self.stopped = False
-        self.polltime = 0.2
-        self.loop.add_callback(self.poll)
-        self.futures = []
-
-    @gen.coroutine
-    def poll(self):
-        while not self.stopped:
-            # executes callbacks for any delivered data, in this thread
-            # if no messages were sent, nothing happens
-            self.producer.poll(0)
-            yield gen.sleep(self.polltime)
-
-    def update(self, x, who=None, metadata=None):
-        future = gen.Future()
-        self.futures.append(future)
-
-        @gen.coroutine
-        def _():
-            while True:
-                try:
-                    # this runs asynchronously, in C-K's thread
-                    self.producer.produce(self.topic, x, callback=self.cb)
-                    return
-                except BufferError:
-                    yield gen.sleep(self.polltime)
-                except Exception as e:
-                    future.set_exception(e)
-                    return
-
-        self.loop.add_callback(_)
-        return future
-
-    @gen.coroutine
-    def cb(self, err, msg):
-        future = self.futures.pop(0)
-        if msg is not None and msg.value() is not None:
-            future.set_result(None)
-            yield self._emit(msg.value())
-        else:
-            future.set_exception(err or msg.error())
-
-    def flush(self, timeout=-1):
-        self.producer.flush(timeout)
-
-
 def sync(loop, func, *args, **kwargs):
     """
     Run coroutine in loop running in separate thread.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/river.py 
new/streamz-0.6.4/streamz/river.py
--- old/streamz-0.6.3/streamz/river.py  1970-01-01 01:00:00.000000000 +0100
+++ new/streamz-0.6.4/streamz/river.py  2021-12-30 22:18:19.000000000 +0100
@@ -0,0 +1,62 @@
+from . import Stream
+
+
+# TODO: most river classes support batches, e.g., learn_many, more efficiently
+
+
+class RiverTransform(Stream):
+    """Pass data through one or more River transforms"""
+
+    def __init__(self, model, **kwargs):
+        super().__init__(**kwargs)
+        self.model = model
+
+    def update(self, x, who=None, metadata=None):
+        out = self.model.transform_one(*x)
+        self.emit(out)
+
+
+class RiverTrain(Stream):
+
+    def __init__(self, model, metric=None, pass_model=False, **kwargs):
+        """
+
+        If metric and pass_model are both defaults, this is effectively
+        a sink.
+
+        :param model: river model or pipeline
+        :param metric: river metric
+            If given, it is emitted on every sample
+        :param pass_model: bool
+            If True, the (updated) model if emitted for each sample
+        """
+        super().__init__(**kwargs)
+        self.model = model
+        if pass_model and metric is not None:
+            raise TypeError
+        self.pass_model = pass_model
+        self.metric = metric
+
+    def update(self, x, who=None, metadata=None):
+        """
+        :param x: tuple
+            (x, [y[, w]) floats for single sample. Include
+        """
+        self.model.learn_one(*x)
+        if self.metric:
+            yp = self.model.predict_one(x[0])
+            weights = x[2] if len(x) > 1 else 1.0
+            return self._emit(self.metric.update(x[1], yp, weights).get(), 
metadata=metadata)
+        if self.pass_model:
+            return self._emit(self.model, metadata=metadata)
+
+
+class RiverPredict(Stream):
+
+    def __init__(self, model, **kwargs):
+        super().__init__(**kwargs)
+        self.model = model
+
+    def update(self, x, who=None, metadata=None):
+        out = self.model.predict_one(x)
+        return self._emit(out, metadata=metadata)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/sinks.py 
new/streamz-0.6.4/streamz/sinks.py
--- old/streamz-0.6.3/streamz/sinks.py  2021-05-18 23:35:26.000000000 +0200
+++ new/streamz-0.6.4/streamz/sinks.py  2021-11-01 22:36:12.000000000 +0100
@@ -4,6 +4,7 @@
 from tornado import gen
 
 from streamz import Stream
+from streamz.core import sync
 
 # sinks add themselves here to avoid being garbage-collected
 _global_sinks = set()
@@ -109,3 +110,164 @@
 
     def update(self, x, who=None, metadata=None):
         self._fp.write(x + self._end)
+
+
+@Stream.register_api()
+class to_kafka(Stream):
+    """ Writes data in the stream to Kafka
+
+    This stream accepts a string or bytes object. Call ``flush`` to ensure all
+    messages are pushed. Responses from Kafka are pushed downstream.
+
+    Parameters
+    ----------
+    topic : string
+        The topic which to write
+    producer_config : dict
+        Settings to set up the stream, see
+        
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
+        https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+        Examples:
+        bootstrap.servers: Connection string (host:port) to Kafka
+
+    Examples
+    --------
+    >>> from streamz import Stream
+    >>> ARGS = {'bootstrap.servers': 'localhost:9092'}
+    >>> source = Stream()
+    >>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
+    <to_kafka>
+    >>> for i in range(10):
+    ...     source.emit(i)
+    >>> kafka.flush()
+    """
+    def __init__(self, upstream, topic, producer_config, **kwargs):
+        import confluent_kafka as ck
+
+        self.topic = topic
+        self.producer = ck.Producer(producer_config)
+
+        kwargs["ensure_io_loop"] = True
+        Stream.__init__(self, upstream, **kwargs)
+        self.stopped = False
+        self.polltime = 0.2
+        self.loop.add_callback(self.poll)
+        self.futures = []
+
+    @gen.coroutine
+    def poll(self):
+        while not self.stopped:
+            # executes callbacks for any delivered data, in this thread
+            # if no messages were sent, nothing happens
+            self.producer.poll(0)
+            yield gen.sleep(self.polltime)
+
+    def update(self, x, who=None, metadata=None):
+        future = gen.Future()
+        self.futures.append(future)
+
+        @gen.coroutine
+        def _():
+            while True:
+                try:
+                    # this runs asynchronously, in C-K's thread
+                    self.producer.produce(self.topic, x, callback=self.cb)
+                    return
+                except BufferError:
+                    yield gen.sleep(self.polltime)
+                except Exception as e:
+                    future.set_exception(e)
+                    return
+
+        self.loop.add_callback(_)
+        return future
+
+    @gen.coroutine
+    def cb(self, err, msg):
+        future = self.futures.pop(0)
+        if msg is not None and msg.value() is not None:
+            future.set_result(None)
+            yield self._emit(msg.value())
+        else:
+            future.set_exception(err or msg.error())
+
+    def flush(self, timeout=-1):
+        self.producer.flush(timeout)
+
+
+@Stream.register_api()
+class to_websocket(Sink):
+    """Write bytes data to websocket
+
+    The websocket will be opened on first call, and kept open. Should
+    it close at some point, future writes will fail.
+
+    Requires the ``websockets`` package.
+
+    :param uri: str
+        Something like "ws://host:port". Use "wss:" to allow TLS.
+    :param ws_kwargs: dict
+        Further kwargs to pass to ``websockets.connect``, please
+        read its documentation.
+    :param kwargs:
+        Passed to superclass
+    """
+
+    def __init__(self, upstream, uri, ws_kwargs=None, **kwargs):
+        self.uri = uri
+        self.ws_kw = ws_kwargs
+        self.ws = None
+        super().__init__(upstream, ensure_io_loop=True, **kwargs)
+
+    async def update(self, x, who=None, metadata=None):
+        import websockets
+        if self.ws is None:
+            self.ws = await websockets.connect(self.uri, **(self.ws_kw or {}))
+        await self.ws.send(x)
+
+    def destroy(self):
+        super().destroy()
+        if self.ws is not None:
+            sync(self.loop, self.ws.protocol.close)
+        self.ws = None
+
+
+@Stream.register_api()
+class to_mqtt(Sink):
+    """
+    Send data to MQTT broker
+
+    See also ``sources.from_mqtt``.
+
+    Requires ``paho.mqtt``
+
+    :param host: str
+    :param port: int
+    :param topic: str
+    :param keepalive: int
+        See mqtt docs - to keep the channel alive
+    :param client_kwargs:
+        Passed to the client's ``connect()`` method
+    """
+    def __init__(self, upstream, host, port, topic, keepalive=60, 
client_kwargs=None,
+                 **kwargs):
+        self.host = host
+        self.port = port
+        self.c_kw = client_kwargs or {}
+        self.client = None
+        self.topic = topic
+        self.keepalive = keepalive
+        super().__init__(upstream, ensure_io_loop=True, **kwargs)
+
+    def update(self, x, who=None, metadata=None):
+        import paho.mqtt.client as mqtt
+        if self.client is None:
+            self.client = mqtt.Client()
+            self.client.connect(self.host, self.port, self.keepalive, 
**self.c_kw)
+        # TODO: wait on successful delivery
+        self.client.publish(self.topic, x)
+
+    def destroy(self):
+        self.client.disconnect()
+        self.client = None
+        super().destroy()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/sources.py 
new/streamz-0.6.4/streamz/sources.py
--- old/streamz-0.6.3/streamz/sources.py        2021-05-18 23:35:26.000000000 
+0200
+++ new/streamz-0.6.4/streamz/sources.py        2021-11-02 14:02:22.000000000 
+0100
@@ -1,11 +1,12 @@
 import asyncio
 from glob import glob
+import queue
 import os
 import time
 from tornado import gen
 import weakref
 
-from .core import Stream, convert_interval, RefCounter
+from .core import Stream, convert_interval, RefCounter, sync
 
 
 def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', 
flush=False):
@@ -448,10 +449,14 @@
             self.stopped = False
             self.consumer = ck.Consumer(self.cpars)
             self.consumer.subscribe(self.topics)
-            weakref.finalize(self, lambda consumer=self.consumer: 
_close_consumer(consumer))
+            weakref.finalize(
+                self, lambda consumer=self.consumer: _close_consumer(consumer)
+            )
             tp = ck.TopicPartition(self.topics[0], 0, 0)
 
-            # blocks for consumer thread to come up
+            # blocks for consumer thread to come up and invoke poll to
+            # establish connection with broker to fetch oauth token for kafka
+            self.consumer.poll(timeout=1)
             self.consumer.get_watermark_offsets(tp)
             self.loop.add_callback(self.poll_kafka)
 
@@ -478,7 +483,8 @@
                  max_batch_size=10000, keys=False,
                  engine=None, **kwargs):
         self.consumer_params = consumer_params
-        # Override the auto-commit config to enforce custom streamz 
checkpointing
+        # Override the auto-commit config to enforce custom streamz
+        # checkpointing
         self.consumer_params['enable.auto.commit'] = 'false'
         if 'auto.offset.reset' not in self.consumer_params.keys():
             consumer_params['auto.offset.reset'] = 'latest'
@@ -586,7 +592,9 @@
             self.stopped = False
             tp = ck.TopicPartition(self.topic, 0, 0)
 
-            # blocks for consumer thread to come up
+            # blocks for consumer thread to come up and invoke poll to 
establish
+            # connection with broker to fetch oauth token for kafka
+            self.consumer.poll(timeout=1)
             self.consumer.get_watermark_offsets(tp)
             self.loop.add_callback(self.poll_kafka)
 
@@ -779,3 +787,135 @@
                 break
             await asyncio.gather(*self._emit(x))
         self.stopped = True
+
+
+@Stream.register_api()
+class from_websocket(Source):
+    """Read binary data from a websocket
+
+    This source will accept connections on a given port and handle messages
+    coming in.
+
+    The websockets library must be installed.
+
+    :param host: str
+        Typically "localhost"
+    :param port: int
+        Which port to listen on (must be available)
+    :param serve_kwargs: dict
+        Passed to ``websockets.serve``
+    :param kwargs:
+        Passed to superclass
+    """
+
+    def __init__(self, host, port, serve_kwargs=None, **kwargs):
+        self.host = host
+        self.port = port
+        self.s_kw = serve_kwargs
+        self.server = None
+        super().__init__(**kwargs)
+
+    @gen.coroutine
+    def _read(self, ws, path):
+        while not self.stopped:
+            data = yield ws.recv()
+            yield self._emit(data)
+
+    async def run(self):
+        import websockets
+        self.server = await websockets.serve(
+            self._read, self.host, self.port, **(self.s_kw or {})
+        )
+
+    def stop(self):
+        self.server.close()
+        sync(self.loop, self.server.wait_closed)
+
+
+@Stream.register_api()
+class from_q(Source):
+    """Source events from a threading.Queue, running another event framework
+
+    The queue is polled, i.e., there is a latency/overhead tradeoff, since
+    we cannot use ``await`` directly with a multithreaded queue.
+
+    Allows mixing of another event loop, for example pyqt, on another thread.
+    Note that, by default, a streamz.Source such as this one will start
+    an event loop in a new thread, unless otherwise specified.
+    """
+
+    def __init__(self, q, sleep_time=0.01, **kwargs):
+        """
+        :param q: threading.Queue
+            Any items pushed into here will become streamz events
+        :param sleep_time: int
+            Sets how long we wait before checking the input queue when
+            empty (in s)
+        :param kwargs:
+            passed to streamz.Source
+        """
+        self.q = q
+        self.sleep = sleep_time
+        super().__init__(**kwargs)
+
+    async def _run(self):
+        """Poll threading queue for events
+        This uses check-and-wait, but overhead is low. Could maybe have
+        a sleep-free version with an threading.Event.
+        """
+        try:
+            out = self.q.get_nowait()
+            await self.emit(out, asynchronous=True)
+        except queue.Empty:
+            await asyncio.sleep(self.sleep)
+
+
+@Stream.register_api()
+class from_mqtt(from_q):
+    """Read from MQTT source
+
+    See https://en.wikipedia.org/wiki/MQTT for a description of the protocol
+    and its uses.
+
+    See also ``sinks.to_mqtt``.
+
+    Requires ``paho.mqtt``
+
+    The outputs are ``paho.mqtt.client.MQTTMessage`` instances, which each have
+    attributes timestamp, payload, topic, ...
+
+    NB: paho.mqtt.python runs on its own thread in this implementation. We may
+    wish to instead call client.loop() directly
+
+    :param host: str
+    :param port: int
+    :param topic: str
+        (May in the future support a list of topics)
+    :param keepalive: int
+        See mqtt docs - to keep the channel alive
+    :param client_kwargs:
+        Passed to the client's ``connect()`` method
+    """
+    def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, 
**kwargs):
+        self.host = host
+        self.port = port
+        self.keepalive = keepalive
+        self.topic = topic
+        self.client_kwargs = client_kwargs
+        super().__init__(q=queue.Queue(), **kwargs)
+
+    def _on_connect(self, client, userdata, flags, rc):
+        client.subscribe(self.topic)
+
+    def _on_message(self, client, userdata, msg):
+        self.q.put(msg)
+
+    async def run(self):
+        import paho.mqtt.client as mqtt
+        client = mqtt.Client()
+        client.on_connect = self._on_connect
+        client.on_message = self._on_message
+        client.connect(self.host, self.port, self.keepalive, 
**(self.client_kwargs or {}))
+        client.loop_start()
+        await super().run()
+        client.disconnect()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_dask.py 
new/streamz-0.6.4/streamz/tests/test_dask.py
--- old/streamz-0.6.3/streamz/tests/test_dask.py        2021-05-18 
23:35:26.000000000 +0200
+++ new/streamz-0.6.4/streamz/tests/test_dask.py        2021-12-30 
22:18:19.000000000 +0100
@@ -1,3 +1,4 @@
+import asyncio
 from operator import add
 import random
 import time
@@ -16,21 +17,21 @@
 
 
 @gen_cluster(client=True)
-def test_map(c, s, a, b):
+async def test_map(c, s, a, b):
     source = Stream(asynchronous=True)
     futures = scatter(source).map(inc)
     futures_L = futures.sink_to_list()
     L = futures.gather().sink_to_list()
 
     for i in range(5):
-        yield source.emit(i)
+        await source.emit(i)
 
     assert L == [1, 2, 3, 4, 5]
     assert all(isinstance(f, Future) for f in futures_L)
 
 
 @gen_cluster(client=True)
-def test_map_on_dict(c, s, a, b):
+async def test_map_on_dict(c, s, a, b):
     # dask treats dicts differently, so we have to make sure
     # the user sees no difference in the streamz api.
     # Regression test against #336
@@ -43,7 +44,7 @@
     L = futures.gather().sink_to_list()
 
     for i in range(5):
-        yield source.emit({"i": i})
+        await source.emit({"i": i})
 
     assert len(L) == 5
     for i, item in enumerate(sorted(L, key=lambda x: x["x"])):
@@ -52,7 +53,7 @@
 
 
 @gen_cluster(client=True)
-def test_partition_then_scatter_async(c, s, a, b):
+async def test_partition_then_scatter_async(c, s, a, b):
     # Ensure partition w/ timeout before scatter works correctly for
     # asynchronous
     start = time.monotonic()
@@ -63,10 +64,10 @@
 
     rc = RefCounter(loop=source.loop)
     for i in range(3):
-        yield source.emit(i, metadata=[{'ref': rc}])
+        await source.emit(i, metadata=[{'ref': rc}])
 
     while rc.count != 0 and time.monotonic() - start < 1.:
-        yield gen.sleep(1e-2)
+        await gen.sleep(1e-2)
 
     assert L == [1, 2, 3]
 
@@ -92,7 +93,7 @@
 
 
 @gen_cluster(client=True)
-def test_non_unique_emit(c, s, a, b):
+async def test_non_unique_emit(c, s, a, b):
     """Regression for https://github.com/python-streamz/streams/issues/397
 
     Non-unique stream entries still need to each be processed.
@@ -103,28 +104,28 @@
 
     for _ in range(3):
         # Emit non-unique values
-        yield source.emit(0)
+        await source.emit(0)
 
     assert len(L) == 3
     assert L[0] != L[1] or L[0] != L[2]
 
 
 @gen_cluster(client=True)
-def test_scan(c, s, a, b):
+async def test_scan(c, s, a, b):
     source = Stream(asynchronous=True)
     futures = scatter(source).map(inc).scan(add)
     futures_L = futures.sink_to_list()
     L = futures.gather().sink_to_list()
 
     for i in range(5):
-        yield source.emit(i)
+        await source.emit(i)
 
     assert L == [1, 3, 6, 10, 15]
     assert all(isinstance(f, Future) for f in futures_L)
 
 
 @gen_cluster(client=True)
-def test_scan_state(c, s, a, b):
+async def test_scan_state(c, s, a, b):
     source = Stream(asynchronous=True)
 
     def f(acc, i):
@@ -133,33 +134,33 @@
 
     L = scatter(source).scan(f, returns_state=True).gather().sink_to_list()
     for i in range(3):
-        yield source.emit(i)
+        await source.emit(i)
 
     assert L == [0, 1, 3]
 
 
 @gen_cluster(client=True)
-def test_zip(c, s, a, b):
+async def test_zip(c, s, a, b):
     a = Stream(asynchronous=True)
     b = Stream(asynchronous=True)
     c = scatter(a).zip(scatter(b))
 
     L = c.gather().sink_to_list()
 
-    yield a.emit(1)
-    yield b.emit('a')
-    yield a.emit(2)
-    yield b.emit('b')
+    await a.emit(1)
+    await b.emit('a')
+    await a.emit(2)
+    await b.emit('b')
 
     assert L == [(1, 'a'), (2, 'b')]
 
 
 @gen_cluster(client=True)
-def test_accumulate(c, s, a, b):
+async def test_accumulate(c, s, a, b):
     source = Stream(asynchronous=True)
     L = source.scatter().accumulate(lambda acc, x: acc + x, 
with_state=True).gather().sink_to_list()
     for i in range(3):
-        yield source.emit(i)
+        await source.emit(i)
     assert L[-1][1] == 3
 
 
@@ -169,10 +170,9 @@
             source = Stream()
             L = source.scatter().map(inc).gather().sink_to_list()
 
-            @gen.coroutine
-            def f():
+            async def f():
                 for i in range(10):
-                    yield source.emit(i, asynchronous=True)
+                    await source.emit(i, asynchronous=True)
 
             sync(loop, f)
 
@@ -193,24 +193,24 @@
 
 
 @gen_cluster(client=True, nthreads=[('127.0.0.1', 1)] * 2)
-def test_buffer(c, s, a, b):
+async def test_buffer(c, s, a, b):
     source = Stream(asynchronous=True)
     L = source.scatter().map(slowinc, 
delay=0.5).buffer(5).gather().sink_to_list()
 
     start = time.time()
     for i in range(5):
-        yield source.emit(i)
+        await source.emit(i)
     end = time.time()
     assert end - start < 0.5
 
     for i in range(5, 10):
-        yield source.emit(i)
+        await source.emit(i)
 
     end2 = time.time()
     assert end2 - start > (0.5 / 3)
 
     while len(L) < 10:
-        yield gen.sleep(0.01)
+        await gen.sleep(0.01)
         assert time.time() - start < 5
 
     assert L == list(map(inc, range(10)))
@@ -242,7 +242,7 @@
 
 
 @pytest.mark.xfail(reason='')
-def test_stream_shares_client_loop(loop):  # noqa: F811
+async def test_stream_shares_client_loop(loop):  # noqa: F811
     with cluster() as (s, [a, b]):
         with Client(s['address'], loop=loop) as client:  # noqa: F841
             source = Stream()
@@ -251,7 +251,7 @@
 
 
 @gen_cluster(client=True)
-def test_starmap(c, s, a, b):
+async def test_starmap(c, s, a, b):
     def add(x, y, z=0):
         return x + y + z
 
@@ -259,6 +259,6 @@
     L = source.scatter().starmap(add, z=10).gather().sink_to_list()
 
     for i in range(5):
-        yield source.emit((i, i))
+        await source.emit((i, i))
 
     assert L == [10, 12, 14, 16, 18]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_kafka.py 
new/streamz-0.6.4/streamz/tests/test_kafka.py
--- old/streamz-0.6.3/streamz/tests/test_kafka.py       2021-05-18 
23:35:26.000000000 +0200
+++ new/streamz-0.6.4/streamz/tests/test_kafka.py       2022-07-27 
20:06:05.000000000 +0200
@@ -1,3 +1,4 @@
+import asyncio
 import atexit
 from contextlib import contextmanager
 from flaky import flaky
@@ -217,7 +218,7 @@
 
 
 @gen_cluster(client=True, timeout=60)
-def test_kafka_dask_batch(c, s, w1, w2):
+async def test_kafka_dask_batch(c, s, w1, w2):
     j = random.randint(0, 10000)
     ARGS = {'bootstrap.servers': 'localhost:9092',
             'group.id': 'streamz-test%i' % j}
@@ -227,15 +228,15 @@
                                            asynchronous=True, dask=True)
         out = stream.gather().sink_to_list()
         stream.start()
-        yield gen.sleep(5)  # this frees the loop while dask workers report in
+        await asyncio.sleep(5)  # this frees the loop while dask workers 
report in
         assert isinstance(stream, DaskStream)
         for i in range(10):
             kafka.produce(TOPIC, b'value-%d' % i)
         kafka.flush()
-        yield await_for(lambda: any(out), 10, period=0.2)
+        await await_for(lambda: any(out), 10, period=0.2)
         assert {'key': None, 'value': b'value-1'} in out[0]
         stream.stop()
-        yield gen.sleep(0)
+        await asyncio.sleep(0)
         stream.upstream.upstream.consumer.close()
 
 
@@ -382,7 +383,7 @@
 
 
 @gen_cluster(client=True, timeout=60)
-def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2):
+async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2):
     '''
     Testing whether Dask's scatter and gather works in conformity with
     the reference counting checkpointing implementation.
@@ -403,23 +404,23 @@
             kafka.produce(TOPIC, b'value-%d' % i)
         kafka.flush()
         stream1 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True,
-                                           dask=True)
+                                            dask=True)
         out1 = stream1.map(split).gather().filter(lambda x: x[-1] % 2 == 
1).sink_to_list()
         stream1.start()
-        yield await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, 
period=0.2)
+        await await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, 
period=0.2)
         stream1.upstream.stopped = True
         stream2 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True,
-                                           dask=True)
+                                            dask=True)
         out2 = stream2.map(split).gather().filter(lambda x: x[-1] % 2 == 
1).sink_to_list()
         stream2.start()
         time.sleep(5)
         assert len(out2) == 0
         stream2.upstream.stopped = True
         stream3 = Stream.from_kafka_batched(TOPIC, ARGS2, asynchronous=True,
-                                           dask=True)
+                                            dask=True)
         out3 = stream3.map(split).gather().filter(lambda x: x[-1] % 2 == 
1).sink_to_list()
         stream3.start()
-        yield await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, 
period=0.2)
+        await await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, 
period=0.2)
         stream3.upstream.stopped = True
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz/tests/test_sinks.py 
new/streamz-0.6.4/streamz/tests/test_sinks.py
--- old/streamz-0.6.3/streamz/tests/test_sinks.py       2021-05-18 
23:35:26.000000000 +0200
+++ new/streamz-0.6.4/streamz/tests/test_sinks.py       2022-07-27 
20:06:05.000000000 +0200
@@ -3,7 +3,7 @@
 import pytest
 from streamz import Stream
 from streamz.sinks import _global_sinks, Sink
-from streamz.utils_test import tmpfile
+from streamz.utils_test import tmpfile, wait_for
 
 
 def test_sink_with_args_and_kwargs():
@@ -71,3 +71,35 @@
     del sink
 
     assert ref() is None
+
+
+def test_ws_roundtrip():
+    pytest.importorskip("websockets")
+    s0 = Stream.from_websocket("localhost", 8989, start=True)
+    l = s0.sink_to_list()
+
+    data = [b'0123'] * 4
+    s = Stream.from_iterable(data)
+    s.to_websocket("ws://localhost:8989")
+    s.start()
+
+    wait_for(lambda: data == l, timeout=1)
+    s.stop()
+    s0.stop()
+
+
+def test_mqtt_roundtrip():
+    pytest.importorskip("paho.mqtt.client")
+    s0 = Stream.from_mqtt("mqtt.eclipseprojects.io", 1883, 
"streamz/sensor/temperature")
+    l = s0.map(lambda msg: msg.payload).sink_to_list()
+    s0.start()
+
+    data = [b'0123'] * 4
+    s = Stream.from_iterable(data)
+    s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
+    s.start()
+
+    wait_for(lambda: data == l, timeout=1)
+    s.stop()
+    s0.stop()
+
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz.egg-info/PKG-INFO 
new/streamz-0.6.4/streamz.egg-info/PKG-INFO
--- old/streamz-0.6.3/streamz.egg-info/PKG-INFO 2021-10-04 16:47:26.000000000 
+0200
+++ new/streamz-0.6.4/streamz.egg-info/PKG-INFO 2022-07-27 20:07:32.000000000 
+0200
@@ -1,37 +1,40 @@
-Metadata-Version: 1.2
+Metadata-Version: 2.1
 Name: streamz
-Version: 0.6.3
+Version: 0.6.4
 Summary: Streams
 Home-page: http://github.com/python-streamz/streamz/
 Maintainer: Matthew Rocklin
 Maintainer-email: mrock...@gmail.com
 License: BSD
-Description: Streamz
-        =======
-        
-        |Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI|
-        
-        Streamz helps you build pipelines to manage continuous streams of 
data. It is simple to use in simple cases, but also supports complex pipelines 
that involve branching, joining, flow control, feedback, back pressure, and so 
on.
-        
-        Optionally, Streamz can also work with both `Pandas 
<https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
 and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide 
sensible streaming operations on continuous tabular data.
-        
-        To learn more about how to use Streamz see documentation at 
`streamz.readthedocs.org <https://streamz.readthedocs.org>`_.
-        
-        LICENSE
-        -------
-        
-        BSD-3 Clause
-        
-        .. |Build Status| image:: 
https://github.com/python-streamz/streamz/workflows/CI/badge.svg
-           :target: https://github.com/python-streamz/streamz/actions
-        .. |Doc Status| image:: 
http://readthedocs.org/projects/streamz/badge/?version=latest
-           :target: http://streamz.readthedocs.org/en/latest/
-           :alt: Documentation Status
-        .. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg
-           :target: https://pypi.python.org/pypi/streamz/
-        .. |RAPIDS custreamz gpuCI| image:: 
https://img.shields.io/badge/gpuCI-custreamz-green
-           :target: 
https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py
-        
 Keywords: streams
 Platform: UNKNOWN
-Requires-Python: >=3.6
+Requires-Python: >=3.7
+License-File: LICENSE.txt
+
+Streamz
+=======
+
+|Build Status| |Doc Status| |Version Status| |RAPIDS custreamz gpuCI|
+
+Streamz helps you build pipelines to manage continuous streams of data. It is 
simple to use in simple cases, but also supports complex pipelines that involve 
branching, joining, flow control, feedback, back pressure, and so on.
+
+Optionally, Streamz can also work with both `Pandas 
<https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
 and `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_ dataframes, to provide 
sensible streaming operations on continuous tabular data.
+
+To learn more about how to use Streamz see documentation at 
`streamz.readthedocs.org <https://streamz.readthedocs.org>`_.
+
+LICENSE
+-------
+
+BSD-3 Clause
+
+.. |Build Status| image:: 
https://github.com/python-streamz/streamz/workflows/CI/badge.svg
+   :target: https://github.com/python-streamz/streamz/actions
+.. |Doc Status| image:: 
http://readthedocs.org/projects/streamz/badge/?version=latest
+   :target: http://streamz.readthedocs.org/en/latest/
+   :alt: Documentation Status
+.. |Version Status| image:: https://img.shields.io/pypi/v/streamz.svg
+   :target: https://pypi.python.org/pypi/streamz/
+.. |RAPIDS custreamz gpuCI| image:: 
https://img.shields.io/badge/gpuCI-custreamz-green
+   :target: 
https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py
+
+
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/streamz-0.6.3/streamz.egg-info/SOURCES.txt 
new/streamz-0.6.4/streamz.egg-info/SOURCES.txt
--- old/streamz-0.6.3/streamz.egg-info/SOURCES.txt      2021-10-04 
16:47:26.000000000 +0200
+++ new/streamz-0.6.4/streamz.egg-info/SOURCES.txt      2022-07-27 
20:07:33.000000000 +0200
@@ -31,6 +31,7 @@
 streamz/graph.py
 streamz/orderedweakset.py
 streamz/plugins.py
+streamz/river.py
 streamz/sinks.py
 streamz/sources.py
 streamz/utils.py

++++++ support-new-distributed.patch ++++++
Index: streamz-0.6.4/streamz/tests/test_dask.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_dask.py
+++ streamz-0.6.4/streamz/tests/test_dask.py
@@ -13,7 +13,7 @@ from streamz import RefCounter, Stream
 
 from distributed import Future, Client
 from distributed.utils import sync
-from distributed.utils_test import gen_cluster, inc, cluster, loop, slowinc  # 
noqa: F401
+from distributed.utils_test import cleanup, gen_cluster, inc, cluster, loop, 
slowinc  # noqa: F401
 
 
 @gen_cluster(client=True)
Index: streamz-0.6.4/streamz/tests/test_core.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_core.py
+++ streamz-0.6.4/streamz/tests/test_core.py
@@ -19,7 +19,7 @@ from streamz import RefCounter
 from streamz.sources import sink_to_file
 from streamz.utils_test import (inc, double, gen_test, tmpfile, 
captured_logger,   # noqa: F401
         clean, await_for, metadata, wait_for)  # noqa: F401
-from distributed.utils_test import loop   # noqa: F401
+from distributed.utils_test import cleanup, loop   # noqa: F401
 
 
 def test_basic():

Reply via email to