This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 
3545-problem-in-online-ml-library-regression-algorithm
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3545-problem-in-online-ml-library-regression-algorithm by this push:
     new b0fe9fa3b9 Fix online ml functions
b0fe9fa3b9 is described below

commit b0fe9fa3b94136b7a3d8f48ff412b9f0b928fdaf
Author: Sven Oehler <[email protected]>
AuthorDate: Wed Oct 1 12:09:01 2025 +0200

    Fix online ml functions
---
 ...ine-learning-on-a-streampipes-data-stream.ipynb | 49 ++++++++++++----------
 .../streampipes/function_zoo/river_function.py     |  9 ++--
 .../functions/utils/data_stream_generator.py       | 10 +++--
 3 files changed, 40 insertions(+), 28 deletions(-)

diff --git 
a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
 
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
index adcc24363e..26fb9de7fe 100644
--- 
a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+++ 
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
@@ -29,7 +29,7 @@
    "outputs": [],
    "source": [
     "# you can install all required dependencies for this tutorial by 
executing the following command\n",
-    "%pip install river streampipes"
+    "%pip install river streampipes graphviz"
    ]
   },
   {
@@ -281,7 +281,11 @@
     ")\n",
     "\n",
     "clustering = OnlineML(\n",
-    "    client=client, stream_ids=[\"sp:spdatastream:xboBFK\"], 
model=k_means, prediction_type=RuntimeType.INTEGER.value\n",
+    "    client=client,\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
+    "    output_stream_name=\"KMeans Prediction\",\n",
+    "    model=k_means,\n",
+    "    prediction_type=RuntimeType.INTEGER.value,\n",
     ")\n",
     "clustering.start()"
    ]
@@ -328,8 +332,9 @@
    "outputs": [],
    "source": [
     "import pickle\n",
+    "from typing import Dict, Any\n",
     "from river import compose, tree\n",
-    "from streampipes.function_zoo.river_function import OnlineML\n",
+    "from streampipes.function_zoo.river_function import OnlineML, 
RiverFunction\n",
     "from streampipes.functions.utils.data_stream_generator import 
RuntimeType\n",
     "\n",
     "hoeffding_tree = compose.Pipeline(\n",
@@ -338,14 +343,13 @@
     ")\n",
     "\n",
     "\n",
-    "def draw_tree(self, event, streamId):\n",
+    "def draw_tree(self: RiverFunction, event: Dict[str, Any], streamId: 
str):\n",
     "    \"\"\"Draw the tree and save the image.\"\"\"\n",
     "    if self.learning:\n",
     "        if self.model[1].n_nodes != None:\n",
     "            self.model[1].draw().render(\"hoeffding_tree\", 
format=\"png\", cleanup=True)\n",
     "\n",
-    "\n",
-    "def save_model(self):\n",
+    "def save_model(self: RiverFunction):\n",
     "    \"\"\"Save the trained model.\"\"\"\n",
     "    with open(\"hoeffding_tree.pkl\", \"wb\") as f:\n",
     "        pickle.dump(self.model, f)\n",
@@ -353,8 +357,9 @@
     "\n",
     "regressor = OnlineML(\n",
     "    client=client,\n",
-    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
     "    model=hoeffding_tree,\n",
+    "    output_stream_name=\"Hoeffding Tree Prediction\",\n",
     "    prediction_type=RuntimeType.FLOAT.value,\n",
     "    supervised=True,\n",
     "    target_label=\"temperature\",\n",
@@ -397,8 +402,9 @@
    "outputs": [],
    "source": [
     "import pickle\n",
+    "from typing import Dict, Any\n",
     "from river import compose, tree\n",
-    "from streampipes.function_zoo.river_function import OnlineML\n",
+    "from streampipes.function_zoo.river_function import OnlineML, 
RiverFunction\n",
     "from streampipes.functions.utils.data_stream_generator import 
RuntimeType\n",
     "\n",
     "decision_tree = compose.Pipeline(\n",
@@ -407,14 +413,14 @@
     ")\n",
     "\n",
     "\n",
-    "def draw_tree(self, event, streamId):\n",
+    "def draw_tree(self: RiverFunction, event: Dict[str, Any], streamId: 
str):\n",
     "    \"\"\"Draw the tree and save the image.\"\"\"\n",
     "    if self.learning:\n",
     "        if self.model[1].n_nodes != None:\n",
     "            self.model[1].draw().render(\"decicion_tree\", 
format=\"png\", cleanup=True)\n",
     "\n",
     "\n",
-    "def save_model(self):\n",
+    "def save_model(self: RiverFunction):\n",
     "    \"\"\"Save the trained model.\"\"\"\n",
     "    with open(\"decision_tree.pkl\", \"wb\") as f:\n",
     "        pickle.dump(self.model, f)\n",
@@ -422,8 +428,9 @@
     "\n",
     "classifier = OnlineML(\n",
     "    client=client,\n",
-    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
+    "    stream_ids=[\"Enter stream id here\"],\n",
     "    model=decision_tree,\n",
+    "    output_stream_name=\"DecisionTree Prediction\",\n",
     "    prediction_type=RuntimeType.BOOLEAN.value,\n",
     "    supervised=True,\n",
     "    target_label=\"sensor_fault_flags\",\n",
@@ -453,31 +460,31 @@
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "That's already it! Isn't it truly easy to apply Online ML with 
StreamPipes and River? Please go ahead and apply it to your own use cases. We 
would be happy to hear about them!"
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "That's already it! Isn't it truly easy to apply Online ML with 
StreamPipes and River? Please go ahead and apply it to your own use cases. We 
would be happy to hear about them!"
+   ]
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "Want to see more exciting use cases you can achieve with StreamPipes 
functions in Python? Then don’t hesitate and jump to our [next 
tutorial](../5-applying-interoperable-machine-learning-in-streampipes) on using 
interoperable machine learning algorithm models with StreamPipes Python and 
[ONNX](https://onnx.ai/)."
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "Want to see more exciting use cases you can achieve with StreamPipes 
functions in Python? Then don’t hesitate and jump to our [next 
tutorial](../5-applying-interoperable-machine-learning-in-streampipes) on using 
interoperable machine learning algorithm models with StreamPipes Python and 
[ONNX](https://onnx.ai/)."
+   ]
   },
   {
    "cell_type": "markdown",
+   "metadata": {},
    "source": [
     "How do you like this tutorial?\n",
     "We hope you like it and would love to receive some feedback from you.\n",
     "Just go to our [GitHub discussion 
page](https://github.com/apache/streampipes/discussions) and let us know your 
impression.\n",
     "We'll read and react to them all, we promise!"
-   ],
-   "metadata": {}
+   ]
   },
   {
    "cell_type": "markdown",
diff --git 
a/streampipes-client-python/streampipes/function_zoo/river_function.py 
b/streampipes-client-python/streampipes/function_zoo/river_function.py
index 910097eef2..01c475bd35 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -138,7 +138,9 @@ class OnlineML:
         The ids of the data stream to train the model.
     model: Any
         The model to train. It meant to be a River model/pipeline,
-        but can be every model with a 'learn_one' and 'predict_one' methode.
+        but can be every model with a 'learn_one' and 'predict_one' method.
+    output_stream_name: str
+        The name and id of the output stream that contains the prediction of 
the model
     prediction_type: str
         The data type of the prediction.
         Is only needed when you continue to work with the prediction in 
StreamPipes.
@@ -159,6 +161,7 @@ class OnlineML:
         client: StreamPipesClient,
         stream_ids: List[str],
         model: Any,
+        output_stream_name: str = "Online ML Prediction",
         prediction_type: str = RuntimeType.STRING.value,
         supervised: bool = False,
         target_label: Optional[str] = None,
@@ -175,10 +178,10 @@ class OnlineML:
                 raise ValueError("You must define a target attribute for a 
supervised model.")
 
         output_stream = create_data_stream(
-            name="prediction",
+            name=output_stream_name,
             attributes=attributes,
             
broker=get_broker_description(client.dataStreamApi.get(stream_ids[0])),  # 
type: ignore
-            stream_id=stream_ids[0],
+            stream_id=output_stream_name,
         )
         function_definition = 
FunctionDefinition(consumed_streams=stream_ids).add_output_data_stream(output_stream)
         self.sp_function = RiverFunction(
diff --git 
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
 
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index 6587a2e53c..00dd200b03 100644
--- 
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ 
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -16,7 +16,7 @@
 #
 
 from enum import Enum
-from typing import Dict
+from typing import Dict, Optional
 
 from streampipes.functions.broker import SupportedBroker
 from streampipes.model.common import (
@@ -53,7 +53,7 @@ class RuntimeType(Enum):
 def create_data_stream(
     name: str,
     attributes: Dict[str, str],
-    stream_id: str = None,
+    stream_id: Optional[str] = None,
     broker: SupportedBroker = SupportedBroker.NATS,
 ):
     """Creates a data stream
@@ -64,8 +64,8 @@ def create_data_stream(
         Name of the data stream to be shown at the UI.
     attributes: Dict[str, str]
         Name and types of the attributes.
-    stream_id: str
-        The id of this data stream.
+    stream_id: Optional[str]
+        The id of this data stream. If none is provided the name is used as id
 
     Returns
     -------
@@ -103,6 +103,8 @@ def create_data_stream(
             )
         ]
 
+    if not stream_id:
+        stream_id = name
     sanitized_stream_id = stream_id.replace(" ", "")
 
     # Assign a default topic name incorporating the unique stream ID to each 
protocol.

Reply via email to