This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72f4b7b1e0 Fix(#3545): Fix online ml functions (#3803)
72f4b7b1e0 is described below

commit 72f4b7b1e0a51fc5ced2c2c49073835cc4d5993b
Author: Sven Oehler <[email protected]>
AuthorDate: Thu Oct 2 15:14:40 2025 +0200

    Fix(#3545): Fix online ml functions (#3803)
    
    Co-authored-by: Dominik Riemer <[email protected]>
---
 ...ine-learning-on-a-streampipes-data-stream.ipynb | 49 ++++++++++++----------
 .../streampipes/function_zoo/river_function.py     |  9 ++--
 .../functions/utils/data_stream_generator.py       | 10 +++--
 3 files changed, 40 insertions(+), 28 deletions(-)

diff --git 
a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
 
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
index adcc24363e..26fb9de7fe 100644
--- 
a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+++ 
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
@@ -29,7 +29,7 @@
    "outputs": [],
    "source": [
     "# you can install all required dependencies for this tutorial by 
executing the following command\n",
-    "%pip install river streampipes"
+    "%pip install river streampipes graphviz"
    ]
   },
   {
@@ -281,7 +281,11 @@
     ")\n",
     "\n",
     "clustering = OnlineML(\n",
-    "    client=client, stream_ids=[\"sp:spdatastream:xboBFK\"], 
model=k_means, prediction_type=RuntimeType.INTEGER.value\n",
+    "    client=client,\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
+    "    output_stream_name=\"KMeans Prediction\",\n",
+    "    model=k_means,\n",
+    "    prediction_type=RuntimeType.INTEGER.value,\n",
     ")\n",
     "clustering.start()"
    ]
@@ -328,8 +332,9 @@
    "outputs": [],
    "source": [
     "import pickle\n",
+    "from typing import Dict, Any\n",
     "from river import compose, tree\n",
-    "from streampipes.function_zoo.river_function import OnlineML\n",
+    "from streampipes.function_zoo.river_function import OnlineML, 
RiverFunction\n",
     "from streampipes.functions.utils.data_stream_generator import 
RuntimeType\n",
     "\n",
     "hoeffding_tree = compose.Pipeline(\n",
@@ -338,14 +343,13 @@
     ")\n",
     "\n",
     "\n",
-    "def draw_tree(self, event, streamId):\n",
+    "def draw_tree(self: RiverFunction, event: Dict[str, Any], streamId: 
str):\n",
     "    \"\"\"Draw the tree and save the image.\"\"\"\n",
     "    if self.learning:\n",
     "        if self.model[1].n_nodes != None:\n",
     "            self.model[1].draw().render(\"hoeffding_tree\", 
format=\"png\", cleanup=True)\n",
     "\n",
-    "\n",
-    "def save_model(self):\n",
+    "def save_model(self: RiverFunction):\n",
     "    \"\"\"Save the trained model.\"\"\"\n",
     "    with open(\"hoeffding_tree.pkl\", \"wb\") as f:\n",
     "        pickle.dump(self.model, f)\n",
@@ -353,8 +357,9 @@
     "\n",
     "regressor = OnlineML(\n",
     "    client=client,\n",
-    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
     "    model=hoeffding_tree,\n",
+    "    output_stream_name=\"Hoeffding Tree Prediction\",\n",
     "    prediction_type=RuntimeType.FLOAT.value,\n",
     "    supervised=True,\n",
     "    target_label=\"temperature\",\n",
@@ -397,8 +402,9 @@
    "outputs": [],
    "source": [
     "import pickle\n",
+    "from typing import Dict, Any\n",
     "from river import compose, tree\n",
-    "from streampipes.function_zoo.river_function import OnlineML\n",
+    "from streampipes.function_zoo.river_function import OnlineML, 
RiverFunction\n",
     "from streampipes.functions.utils.data_stream_generator import 
RuntimeType\n",
     "\n",
     "decision_tree = compose.Pipeline(\n",
@@ -407,14 +413,14 @@
     ")\n",
     "\n",
     "\n",
-    "def draw_tree(self, event, streamId):\n",
+    "def draw_tree(self: RiverFunction, event: Dict[str, Any], streamId: 
str):\n",
     "    \"\"\"Draw the tree and save the image.\"\"\"\n",
     "    if self.learning:\n",
     "        if self.model[1].n_nodes != None:\n",
     "            self.model[1].draw().render(\"decicion_tree\", 
format=\"png\", cleanup=True)\n",
     "\n",
     "\n",
-    "def save_model(self):\n",
+    "def save_model(self: RiverFunction):\n",
     "    \"\"\"Save the trained model.\"\"\"\n",
     "    with open(\"decision_tree.pkl\", \"wb\") as f:\n",
     "        pickle.dump(self.model, f)\n",
@@ -422,8 +428,9 @@
     "\n",
     "classifier = OnlineML(\n",
     "    client=client,\n",
-    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
     "    model=decision_tree,\n",
+    "    output_stream_name=\"DecisionTree Prediction\",\n",
     "    prediction_type=RuntimeType.BOOLEAN.value,\n",
     "    supervised=True,\n",
     "    target_label=\"sensor_fault_flags\",\n",
@@ -453,31 +460,31 @@
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "That's already it! Isn't it truly easy to apply Online ML with 
StreamPipes and River? Please go ahead and apply it to your own use cases. We 
would be happy to hear about them!"
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "That's already it! Isn't it truly easy to apply Online ML with 
StreamPipes and River? Please go ahead and apply it to your own use cases. We 
would be happy to hear about them!"
+   ]
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "Want to see more exciting use cases you can achieve with StreamPipes 
functions in Python? Then don’t hesitate and jump to our [next 
tutorial](../5-applying-interoperable-machine-learning-in-streampipes) on using 
interoperable machine learning algorithm models with StreamPipes Python and 
[ONNX](https://onnx.ai/)."
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "Want to see more exciting use cases you can achieve with StreamPipes 
functions in Python? Then don’t hesitate and jump to our [next 
tutorial](../5-applying-interoperable-machine-learning-in-streampipes) on using 
interoperable machine learning algorithm models with StreamPipes Python and 
[ONNX](https://onnx.ai/)."
+   ]
   },
   {
    "cell_type": "markdown",
+   "metadata": {},
    "source": [
     "How do you like this tutorial?\n",
     "We hope you like it and would love to receive some feedback from you.\n",
     "Just go to our [GitHub discussion 
page](https://github.com/apache/streampipes/discussions) and let us know your 
impression.\n",
     "We'll read and react to them all, we promise!"
-   ],
-   "metadata": {}
+   ]
   },
   {
    "cell_type": "markdown",
diff --git 
a/streampipes-client-python/streampipes/function_zoo/river_function.py 
b/streampipes-client-python/streampipes/function_zoo/river_function.py
index 910097eef2..01c475bd35 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -138,7 +138,9 @@ class OnlineML:
         The ids of the data stream to train the model.
     model: Any
         The model to train. It meant to be a River model/pipeline,
-        but can be every model with a 'learn_one' and 'predict_one' methode.
+        but can be every model with a 'learn_one' and 'predict_one' method.
+    output_stream_name: str
+        The name and id of the output stream that contains the prediction of 
the model
     prediction_type: str
         The data type of the prediction.
         Is only needed when you continue to work with the prediction in 
StreamPipes.
@@ -159,6 +161,7 @@ class OnlineML:
         client: StreamPipesClient,
         stream_ids: List[str],
         model: Any,
+        output_stream_name: str = "Online ML Prediction",
         prediction_type: str = RuntimeType.STRING.value,
         supervised: bool = False,
         target_label: Optional[str] = None,
@@ -175,10 +178,10 @@ class OnlineML:
                 raise ValueError("You must define a target attribute for a 
supervised model.")
 
         output_stream = create_data_stream(
-            name="prediction",
+            name=output_stream_name,
             attributes=attributes,
             
broker=get_broker_description(client.dataStreamApi.get(stream_ids[0])),  # 
type: ignore
-            stream_id=stream_ids[0],
+            stream_id=output_stream_name,
         )
         function_definition = 
FunctionDefinition(consumed_streams=stream_ids).add_output_data_stream(output_stream)
         self.sp_function = RiverFunction(
diff --git 
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
 
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index 6587a2e53c..00dd200b03 100644
--- 
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ 
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -16,7 +16,7 @@
 #
 
 from enum import Enum
-from typing import Dict
+from typing import Dict, Optional
 
 from streampipes.functions.broker import SupportedBroker
 from streampipes.model.common import (
@@ -53,7 +53,7 @@ class RuntimeType(Enum):
 def create_data_stream(
     name: str,
     attributes: Dict[str, str],
-    stream_id: str = None,
+    stream_id: Optional[str] = None,
     broker: SupportedBroker = SupportedBroker.NATS,
 ):
     """Creates a data stream
@@ -64,8 +64,8 @@ def create_data_stream(
         Name of the data stream to be shown at the UI.
     attributes: Dict[str, str]
         Name and types of the attributes.
-    stream_id: str
-        The id of this data stream.
+    stream_id: Optional[str]
+        The id of this data stream. If none is provided the name is used as id
 
     Returns
     -------
@@ -103,6 +103,8 @@ def create_data_stream(
             )
         ]
 
+    if not stream_id:
+        stream_id = name
     sanitized_stream_id = stream_id.replace(" ", "")
 
     # Assign a default topic name incorporating the unique stream ID to each 
protocol.

Reply via email to