This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch rel/0.91.0
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/rel/0.91.0 by this push:
new 705bd340b docs: spring cleaning for Python docs (#1419)
705bd340b is described below
commit 705bd340bd8b48cf35fd5b600a89555db6560641
Author: Tim <[email protected]>
AuthorDate: Mon Mar 20 19:26:25 2023 +0100
docs: spring cleaning for Python docs (#1419)
* chore: increase required coverage for documentation
Signed-off-by: bossenti <[email protected]>
* chore: refine metadata of package
Signed-off-by: bossenti <[email protected]>
* docs: refine mkdocs setup
Signed-off-by: bossenti <[email protected]>
* docs: improve docs
Signed-off-by: bossenti <[email protected]>
---------
Signed-off-by: bossenti <[email protected]>
---
streampipes-client-python/.pre-commit-config.yaml | 2 +-
streampipes-client-python/README.md | 4 +-
.../docs/getting-started/developing.md | 18 ++--
.../docs/getting-started/first-steps.md | 27 ++++--
.../docs/getting-started/quickstart.md | 59 ++++++++-----
streampipes-client-python/docs/index.md | 18 ++--
...introduction-to-streampipes-python-client.ipynb | 10 +--
...cting-data-from-the-streampipes-data-lake.ipynb | 15 +++-
...ive-data-from-the-streampipes-data-stream.ipynb | 27 +++---
...ine-learning-on-a-streampipes-data-stream.ipynb | 6 +-
streampipes-client-python/mkdocs.yml | 25 +++---
streampipes-client-python/setup.py | 7 ++
.../streampipes/client/client.py | 90 ++++++++++++++------
.../streampipes/client/config.py | 10 ++-
.../streampipes/client/credential_provider.py | 16 ++--
.../streampipes/endpoint/api/data_lake_measure.py | 98 ++++++++++++----------
.../streampipes/endpoint/api/data_stream.py | 45 +++++-----
.../streampipes/endpoint/endpoint.py | 46 +++++-----
.../streampipes/function_zoo/river_function.py | 43 ++++++++--
.../streampipes/functions/broker/broker.py | 21 ++---
.../streampipes/functions/broker/broker_handler.py | 18 ++--
.../streampipes/functions/broker/kafka_broker.py | 9 +-
.../functions/broker/kafka_message_fetcher.py | 6 +-
.../streampipes/functions/broker/nats_broker.py | 10 ++-
.../functions/broker/output_collector.py | 28 +++++--
.../streampipes/functions/function_handler.py | 16 +++-
.../streampipes/functions/registration.py | 17 +++-
.../streampipes/functions/streampipes_function.py | 19 ++++-
.../functions/utils/async_iter_handler.py | 23 +++--
.../functions/utils/data_stream_context.py | 4 +-
.../functions/utils/data_stream_generator.py | 17 +++-
.../functions/utils/function_context.py | 9 +-
.../streampipes/model/common.py | 23 ++++-
.../model/container/data_lake_measures.py | 9 +-
.../streampipes/model/container/data_streams.py | 9 +-
.../model/container/resource_container.py | 13 ++-
.../model/resource/data_lake_measure.py | 12 ++-
.../streampipes/model/resource/data_lake_series.py | 10 ++-
.../streampipes/model/resource/data_stream.py | 7 +-
.../model/resource/function_definition.py | 79 +++++++++++------
.../streampipes/model/resource/resource.py | 11 ++-
41 files changed, 604 insertions(+), 332 deletions(-)
diff --git a/streampipes-client-python/.pre-commit-config.yaml
b/streampipes-client-python/.pre-commit-config.yaml
index 23786aa8c..a9bfe3a6d 100644
--- a/streampipes-client-python/.pre-commit-config.yaml
+++ b/streampipes-client-python/.pre-commit-config.yaml
@@ -33,7 +33,7 @@ repos:
name: interrogate
language: python
types: [ python ]
- entry: interrogate -vv --fail-under 95 --omit-covered-files
--ignore-init-method --ignore-module --ignore-magic --ignore-regex test_*
--ignore-regex Test*
+ entry: interrogate -vv --fail-under 100 --omit-covered-files
--ignore-init-method --ignore-module --ignore-magic --ignore-regex test_*
--ignore-regex Test*
- id: pyupgrade
name: pyupgrade
diff --git a/streampipes-client-python/README.md
b/streampipes-client-python/README.md
index 40828c76e..c5225328d 100644
--- a/streampipes-client-python/README.md
+++ b/streampipes-client-python/README.md
@@ -45,7 +45,7 @@ and the amazing universe of data analytics libraries in
Python. </p>
## 📚 Documentation
Please visit our documentation:
https://streampipes.apache.org/docs/docs/python/latest/
There you can find information about how to [get
started](https://streampipes.apache.org/docs/docs/python/latest/getting-started/first-steps/),
-follow some
[examples](https://streampipes.apache.org/docs/docs/python/latest/examples/1-introduction-to-streampipes-python-client/),
+follow some
[tutorials](https://streampipes.apache.org/docs/docs/python/latest/tutorials/1-introduction-to-streampipes-python-client/),
or discover the library via our
[references](https://streampipes.apache.org/docs/docs/python/latest/reference/client/client/).
<br>
@@ -93,4 +93,4 @@ The following StreamPipes resources are available with this
client:
1x DataLakeMeasures
```
-For more information about how to use the StreamPipes client visit our
[introduction
example](https://streampipes.apache.org/docs/docs/python/latest/examples/1-introduction-to-streampipes-python-client/).
+For more information about how to use the StreamPipes client visit our
[introduction
tutorial](https://streampipes.apache.org/docs/docs/python/latest/tutorials/1-introduction-to-streampipes-python-client/).
diff --git a/streampipes-client-python/docs/getting-started/developing.md
b/streampipes-client-python/docs/getting-started/developing.md
index a84fa203b..736506c52 100644
--- a/streampipes-client-python/docs/getting-started/developing.md
+++ b/streampipes-client-python/docs/getting-started/developing.md
@@ -17,8 +17,7 @@
-->
## 📖 Development Guide
-This document describes how to easily set up your local dev environment to
work on the
-StreamPipes Python client 🐍.
+This document describes how to easily set up your local dev environment to
work on StreamPipes Python 🐍.
<br>
### 🚀 First Steps
@@ -43,26 +42,27 @@ pip install ."[dev]"
The pre-commit hook is run before every commit and takes care about code style,
linting, type hints, import sorting, etc. It will stop your commit in case the
changes do not apply the expected format.
Always check to have the recent version of the pre-commit hook installed
otherwise the CI build might fail.
-If you are interested you can have a deeper look on the underlying library:
[pre-commit](https://pre-commit.com/).
+If you are interested, you can have a deeper look on the underlying library:
[pre-commit](https://pre-commit.com/).
```bash
pre-commit install
```
-The definition of the pre-commit hook can be found in
[.pre-commit-config.yaml](.pre-commit-config.yaml).
+The definition of the pre-commit hook can be found in
[.pre-commit-config.yaml](https://github.com/apache/streampipes/blob/dev/streampipes-client-python/.pre-commit-config.yaml).
<br>
### 👏 Conventions
-Below we list some conventions that we have agreed on for creating the
StreamPipes Python client.
+Below we list some conventions that we have agreed on for creating StreamPipes
Python.
Please comply to them when you plan to contribute to this project.
-If you have any other suggestions or would like to discuss them, we would be
happy to hear from you on our mailing list
[[email protected]](mailto:[email protected]).
+If you have any other suggestions or would like to discuss them, we would be
happy to hear from you on our mailing list
[[email protected]](mailto:[email protected])
+or in our [discussions](https://github.com/apache/streampipes/discussions) on
GitHub.
1) **Use `numpy` style for Python docstrings** 📄 <br>
-Please stick to the `numpy` style when writing docstrings, as we require this
for generating our documentation.
+Please stick to the `numpy`
[style](https://numpydoc.readthedocs.io/en/latest/format.html) when writing
docstrings, as we require this for generating our documentation.
2) **Provide tests** ✅ <br>
-We are aiming for broad test coverage for the Python client and
+We are aiming for broad test coverage for the Python package and
have therefore set a requirement of at least 90% unit test coverage.
Therefore, please remember to write (unit) tests already during development.
If you have problems with writing tests, don't hesitate to ask us for help
directly in the PR or
@@ -73,7 +73,7 @@ even before that via our mailing list (see above).
TODO: replace link to java file by link to documentation
--->
3) **Build a similar API as the Java client provides** 🔄 <br>
-Whenever possible, please try to develop the API of the Python library the
same as the [Java
client](../streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java)
or Java SDK.
+Whenever possible, please try to develop the API of the Python library the
same as the [Java
client](https://github.com/apache/streampipes/blob/dev/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java)
or Java SDK.
By doing so, we would like to provide a consistent developer experience and
the basis for automated testing in the future.
---
diff --git a/streampipes-client-python/docs/getting-started/first-steps.md
b/streampipes-client-python/docs/getting-started/first-steps.md
index 8593903b9..8c75d7636 100644
--- a/streampipes-client-python/docs/getting-started/first-steps.md
+++ b/streampipes-client-python/docs/getting-started/first-steps.md
@@ -23,6 +23,7 @@ You can install the latest development version from GitHub,
as so:
```bash
pip install streampipes
+
# if you want to have the current development state you can also execute
pip install
git+https://github.com/apache/streampipes.git#subdirectory=streampipes-client-python
# the corresponding documentation can be found here:
https://streampipes.apache.org/docs/docs/python/dev/
@@ -32,12 +33,8 @@ pip install
git+https://github.com/apache/streampipes.git#subdirectory=streampip
When working with the StreamPipes Python library it is inevitable to have a
running StreamPipes instance to connect and interact with.
In case you don't have a running instance at hand, you can easily set up one
on your local machine.
Hereby you need to consider that StreamPipes supports different message broker
(e.g., Kafka, NATS).
-Although the Python library aims to support all of them equally, we encourage
you to run StreamPipes with the NATS protocol as the messaging layer.
-If you are using a different messaging broker and experience a problem, please
do not hesitate to contact us.
-In case you are unsure if it is indeed a bug, please feel free to start a
[discussion](https://github.com/apache/streampipes/discussions) on GitHub.
-Alternatively, file us a bug in form of a GitHub
[issue](https://github.com/apache/streampipes/issues/new/choose).
+We will demonstrate below how you can easily set up StreamPipes for both
supported message brokers.
<br>
-The following shows how you can set up a StreamPipes instance that uses
[NATS](https://docs.nats.io/) as messaging layer.
### 🐳 Start StreamPipes via Docker Compose
The easiest and therefore recommend way to get StreamPipes started is by using
[docker compose](https://docs.docker.com/compose/).
@@ -51,9 +48,11 @@ CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
... ... ... ... ... ... ...
```
Otherwise, you need to start docker first.
+Please read the full guide on how to start StreamPipes with `docker compose`
[here](https://streampipes.apache.org/docs/docs/deploy-docker.html).
-Please read the full guide on how to start StreamPipes with `docker compose`
[here](https://github.com/apache/streampipes/blob/dev/installer/compose/README.md).
-So in our scenario, we will go with `docker-compose.nats.yml` to use NATS as
messaging backend.
+#### Setup StreamPipes with NATS as message broker
+The following shows how you can set up a StreamPipes instance that uses
[NATS](https://docs.nats.io/) as messaging layer.
+So in this scenario, we will go with `docker-compose.nats.yml`.
Thereby, when running locally, we need to add the following port mapping entry
to `services.nats.ports`:
```yaml
- 4222:4222
@@ -64,6 +63,18 @@ After this modification is applied, StreamPipes can simply
be started with this
docker-compose -f docker-compose.nats.yml up -d
```
-Once all services are started, you can access StreamPipes via
`http://localhost`.
+Once all services are started, you can access StreamPipes via
`http://localhost`.
+
+#### Setup StreamPipes with Kafka as message broker
+Alternatively, you can use `docker-compose.yml` to start StreamPipes with
Kafka as messaging layer.
+Therefore, you onyl need to execute the following command:
+```bash
+docker-compose -f docker-compose.yml up -d
+```
+
+Once all services are started, you can access StreamPipes via
`http://localhost`.
+
+In case you want to have more control over your StreamPipes setup,
+you might take a look at our [deployment
CLI](https://streampipes.apache.org/docs/docs/extend-cli.html).
Have fun discovering StreamPipes and our Python library 🚀
diff --git a/streampipes-client-python/docs/getting-started/quickstart.md
b/streampipes-client-python/docs/getting-started/quickstart.md
index 29e4be51a..02fd517cc 100644
--- a/streampipes-client-python/docs/getting-started/quickstart.md
+++ b/streampipes-client-python/docs/getting-started/quickstart.md
@@ -19,30 +19,47 @@
# ⚡️ Quickstart
As a quick example, we demonstrate how to set up and configure a StreamPipes
client.
+In addition, we will get the available data lake measures out of StreamPipes.
```python
->>> from streampipes.client import StreamPipesClient
->>> from streampipes.client.config import StreamPipesClientConfig
->>> from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
+from streampipes.client import StreamPipesClient
+from streampipes.client.config import StreamPipesClientConfig
+from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
+
+config = StreamPipesClientConfig(
+ credential_provider = StreamPipesApiKeyCredentials(
+ username = "[email protected]",
+ api_key = "DEMO-KEY",
+ ),
+ host_address = "localhost",
+ https_disabled = True,
+ port = 80
+)
+
+client = StreamPipesClient(client_config=config)
+
+# get all available datat lake measures
+measures = client.dataLakeMeasureApi.all()
->>> config = StreamPipesClientConfig(
-... credential_provider = StreamPipesApiKeyCredentials(
-... username = "[email protected]",
-... api_key = "DEMO-KEY",
-... ),
-... host_address = "localhost",
-... http_disabled = True,
-... port = 80
-...)
-
->>> client = StreamPipesClient(client_config=config)
->>> client.describe()
-
-Hi there!
-You are connected to a StreamPipes instance running at http://localhost: 80.
-The following StreamPipes resources are available with this client:
-6x DataStreams
-1x DataLakeMeasures
+# get amount of retrieved measures
+len(measures)
+```
+Output:
+```
+1
+```
+<br>
+
+```
+# inspect the data lake measures as pandas dataframe
+measures.to_pandas()
+```
+
+Output:
+```
+measure_name timestamp_field ... pipeline_is_running num_event_properties
+0 test s0::timestamp ... False 2
+[1 rows x 6 columns]
```
<br>
Alternatively, you can provide your credentials via environment variables.
diff --git a/streampipes-client-python/docs/index.md
b/streampipes-client-python/docs/index.md
index d5d94d57b..7f6115a90 100644
--- a/streampipes-client-python/docs/index.md
+++ b/streampipes-client-python/docs/index.md
@@ -23,13 +23,13 @@
alt="StreamPipes Logo with Python" title="Apache StreamPipes Logo with
Python" width="75%"/>
<br>
</h1>
-<h4 align="center"><a
href="[StreamPipes](https://github.com/apache/streampipes)">StreamPipes</a> is
a self-service (Industrial) IoT toolbox to enable non-technical users to
connect , analyze and explore IoT data streams.</h4>
+<h4 align="center"><a
href="https://github.com/apache/streampipes">StreamPipes</a> is a self-service
(Industrial) IoT toolbox to enable non-technical users to connect, analyze and
explore IoT data streams.</h4>
<br>
-<h3 align="center">Apache StreamPipes for Python</h3>
+<h3 align="center">Apache StreamPipes for Python 🐍</h3>
<p align="center"> Apache StreamPipes meets Python! We are working highly
motivated on a Python library to interact with StreamPipes.
-In this way, we would like to unite the power of StreamPipes to easily connect
to and read different data sources, especially in the IoT domain,
+In this way, we would like to unite the power of StreamPipes to easily connect
to and read from different data sources, especially in the IoT domain,
and the amazing universe of data analytics libraries in Python. </p>
---
@@ -41,12 +41,6 @@ This means that it is still heavily under development, which
may result in frequ
</p>
---
-**🚧 Currently, we do not already version our Python documentation.
-Therefore, the provided docs always represent the development state.
-Please read our [getting started guide](./getting-started/first-steps.md) to
find out how to install the development version of StreamPipes python.
-We will provide a versioned documentation as soon as possible. Stay tuned!**
----
-
## ⚡️ Quickstart
As a quick example, we demonstrate how to set up and configure a StreamPipes
client.
@@ -63,7 +57,7 @@ config = StreamPipesClientConfig(
api_key = "DEMO-KEY",
),
host_address = "localhost",
- http_disabled = True,
+ https_disabled = True,
port = 80
)
@@ -97,7 +91,7 @@ Alternatively, you can provide your credentials via
environment variables.
Simply define your credential provider as follows:
```python
->>> from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
+from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
StreamPipesApiKeyCredentials.from_env(username_env="USER",
api_key_env="API-KEY")
```
@@ -106,4 +100,4 @@ StreamPipesApiKeyCredentials.from_env(username_env="USER",
api_key_env="API-KEY"
`username` is always the username that is used to log in into StreamPipes. <br>
The `api_key` can be generated within the UI as demonstrated below:
-
\ No newline at end of file
+
\ No newline at end of file
diff --git
a/streampipes-client-python/docs/examples/1-introduction-to-streampipes-python-client.ipynb
b/streampipes-client-python/docs/tutorials/1-introduction-to-streampipes-python-client.ipynb
similarity index 94%
rename from
streampipes-client-python/docs/examples/1-introduction-to-streampipes-python-client.ipynb
rename to
streampipes-client-python/docs/tutorials/1-introduction-to-streampipes-python-client.ipynb
index d3eb17eeb..cf5ff5cff 100644
---
a/streampipes-client-python/docs/examples/1-introduction-to-streampipes-python-client.ipynb
+++
b/streampipes-client-python/docs/tutorials/1-introduction-to-streampipes-python-client.ipynb
@@ -14,7 +14,7 @@
"You then can make the data available outside StreamPipes by writing it
into an external source, such as a database, Kafka, etc.\n",
"While this requires another component, you can also extract your data
directly from StreamPipes programmatically using the StreamPipes API.\n",
"For convenience, we also provide you with a StreamPipes client both
available for Java and Python.\n",
- "Specifically with the Python client, we want to address the amazing data
analytics and data science community in Python and benefit from the great
universe of Python libraries out there.\n",
+ "Specifically with StreamPipes Python, we want to address the amazing data
analytics and data science community in Python and benefit from the great
universe of Python libraries out there.\n",
"\n",
"<br>\n",
"\n",
@@ -172,7 +172,7 @@
"outputs": [],
"source": [
"%export USER=\"<USERNAME>\"\n",
- "%export"
+ "%export API_KEY=\"<API-KEY>\""
],
"metadata": {
"collapsed": false
@@ -240,15 +240,15 @@
"\n",
"<br>\n",
"\n",
- "The created `client` instance serves as the central point of interaction
for StreamPipes.\n",
+ "The created `client` instance serves as the central point of interaction
with StreamPipes.\n",
"You can invoke a variety of commands directly on this object.\n",
"\n",
"Are you curious now how you actually can get data out of StreamPipes and
make use of it with Python?\n",
- "Then check out the next example on [extracting Data from the StreamPipes
data lake](../2-extracting-data-from-the-streampipes-data-lake).\n",
+ "Then check out the next tutorial on [extracting Data from the StreamPipes
data lake](../2-extracting-data-from-the-streampipes-data-lake).\n",
"\n",
"<br>\n",
"\n",
- "Thanks for reading this introductory example.\n",
+ "Thanks for reading this introductory tutorial.\n",
"We hope you like it and would love to receive some feedback from you.\n",
"Just go to our [GitHub discussion
page](https://github.com/apache/streampipes/discussions) and let us know your
impression.\n",
"We'll read and react to them all, we promise!"
diff --git
a/streampipes-client-python/docs/examples/2-extracting-data-from-the-streampipes-data-lake.ipynb
b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
similarity index 99%
rename from
streampipes-client-python/docs/examples/2-extracting-data-from-the-streampipes-data-lake.ipynb
rename to
streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
index fb9facf5b..6f1d7d81d 100644
---
a/streampipes-client-python/docs/examples/2-extracting-data-from-the-streampipes-data-lake.ipynb
+++
b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
@@ -5,7 +5,7 @@
"source": [
"# Extracting Data from the StreamPipes data lake\n",
"\n",
- "In the first example ([Introduction to the StreamPipes Python
client](../1-introduction-to-streampipes-python-client)) we took the first
steps with the StreamPipes Python client and learned how to set everything
up.\n",
+ "In the first tutorial ([Introduction to the StreamPipes Python
client](../1-introduction-to-streampipes-python-client)) we took the first
steps with the StreamPipes Python client and learned how to set everything
up.\n",
"Now we are ready to get started and want to retrieve some data out of
StreamPipes.\n",
"In this tutorial, we'll focus on the StreamPipes Data Lake, the component
where StreamPipes stores data internally.\n",
"To get started, we'll use the `client` instance created in the first
tutorial."
@@ -179,7 +179,7 @@
{
"cell_type": "markdown",
"source": [
- "To ge a more comprehensive overview you can take a look at the `pandas`
representation"
+ "To get a more comprehensive overview, you can take a look at the
[`pandas`](https://pandas.pydata.org/) representation"
],
"metadata": {
"collapsed": false
@@ -470,7 +470,7 @@
"cell_type": "markdown",
"source": [
"... from this point on we leave all future processing of the data up to
your creativity.\n",
- "Keep in mind: the general syntax used in this example (`all()`,
`to_pandas()`, `get()`) applies to all endpoints and associated resources of
the StreamPipes Python client.\n",
+ "Keep in mind: the general syntax used in this tutorial (`all()`,
`to_pandas()`, `get()`) applies to all endpoints and associated resources of
the StreamPipes Python client.\n",
"\n",
"If you get further and create exiting stuff with data extracted from
StreamPipes please [let us
know](https://github.com/apache/streampipes/discussions/categories/show-and-tell).\n",
"We are thrilled to see what you as a community will build with the
provided client.\n",
@@ -480,6 +480,15 @@
"collapsed": false
}
},
+ {
+ "cell_type": "markdown",
+ "source": [
+ "For now, that's all about the StreamPipes client. Read the next tutorial
([Getting live data from the StreamPipes data
stream](../3-getting-live-data-from-the-streampipes-data-stream)) if you are
interested in making use of the powerful [StreamPipes
functions](https://streampipes.apache.org/docs/docs/extend-sdk-functions.html)
to interact with StreamPipes event-based."
+ ],
+ "metadata": {
+ "collapsed": false
+ }
+ },
{
"cell_type": "markdown",
"source": [
diff --git
a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
similarity index 98%
rename from
streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
rename to
streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index c9d97a461..0a2187486 100644
---
a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++
b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -6,7 +6,7 @@
"metadata": {},
"source": [
"# Getting live data from the StreamPipes data stream\n",
- "In the last example ([Extracting Data from the StreamPipes data
lake](../2-extracting-data-from-the-streampipes-data-lake)) we learned how to
extract the stored data from a StreamPipes data lake. This tutorial is about
the StreamPipes data stream and shows how to get the live data from a stream.
Therefore, we first create the `client` instance as before."
+ "In the last tutorial ([Extracting Data from the StreamPipes data
lake](../2-extracting-data-from-the-streampipes-data-lake)) we learned how to
extract the stored data from a StreamPipes data lake. This tutorial is about
the StreamPipes data stream and shows how to get the live data from StreamPipes
into Python. Therefore, we first create the `client` instance as before."
]
},
{
@@ -15,13 +15,9 @@
"metadata": {},
"source": [
"**Note** \n",
- "At the moment we only supports NATS as messaging protocol when working
with StreamPipes Functions, so make sure to use the `docker-compose.nats.yml`
file when starting StreamPipes. You also have to insert the port mapping to
this file when working locally:\n",
- "```\n",
- "nats:\n",
- " ...\n",
- " ports: \n",
- " - 4222:4222\n",
- "```"
+ "As of now we mainly developed the support for StreamPipes functions using
NATS as messaging protocol. Consequently, this setup is tested most and should
work flawlessly. Visit our [first-steps](../../getting-started/first-steps)
page to see how to start StreamPipes accordingly.\n",
+ "Anyhow, you can also use the other
[brokers](../../reference/functions/broker/broker) that are currently supported
in StreamPipes Python.\n",
+ "In case you observe any problems, please reach out to us and file us an
[issue](https://github.com/apache/streampipes/issues) on GitHub."
]
},
{
@@ -247,13 +243,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "Next we can create a StreamPipesFunction. For this we need to implement
the 5 following methods:\n",
+ "Next we can create a StreamPipesFunction. For this we need to implement
the 4 following methods:\n",
"- In `requiredStreamIds` you need to insert the `element_id` of the
required streams.\n",
"- `onServiceStarted` is called when the function gets started. There you
can use the given meta information of the `FunctionContext` to initialize the
function.\n",
"- `onEvent` is called when ever a new event arrives. The `event` contains
the live data and you can use the `streamId` to identify a stream if the
function is connected to multiple data streams.\n",
"- `onServiceStopped` is called when the function gets stopped.\n",
"\n",
- "For this example we just create a function that saves every new event in
a `pandas DataFrame` and plots the first column of the `DataFrame` when the
function gets stopped."
+ "For this tutorial we just create a function that saves every new event in
a `pandas DataFrame` and plots the first column of the `DataFrame` when the
function gets stopped."
]
},
{
@@ -590,9 +586,18 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "That's enough for this example. Now you can try to write your own
`StreamPipesFunction`. All you need to do is creating a new class, implementing
the 5 required methods and registering the function.\n"
+ "That's enough for this tutorial. Now you can try to write your own
`StreamPipesFunction`. All you need to do is creating a new class, implementing
the 4 required methods and registering the function.\n"
]
},
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Want to see more exciting use cases you can achieve with StreamPipes
functions in Python? Then don't hesitate and jump to our [next
tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on
applying online machine learning algorithms to StreamPipes data streams with
[River](https://riverml.xyz)."
+ ],
+ "metadata": {
+ "collapsed": false
+ }
+ },
{
"attachments": {},
"cell_type": "markdown",
diff --git
a/streampipes-client-python/docs/examples/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
similarity index 94%
rename from
streampipes-client-python/docs/examples/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
rename to
streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
index a3d933a27..8e6673ab4 100644
---
a/streampipes-client-python/docs/examples/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+++
b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
@@ -6,7 +6,7 @@
"metadata": {},
"source": [
"# Using Online Machine Learning on a StreamPipes data stream\n",
- "The last example ([Getting live data from the StreamPipes data
stream](../3-getting-live-data-from-the-streampipes-data-stream)) showed how we
can connect to a data stream, and it would be possible to use Online Machine
Learning with this approach and train a model with the incoming events at the
`onEvent` method. However, the StreamPipes client also provides an easier way
to do this with the use of the River library for Online Machine Learning. We
will have a look at this now."
+ "The last tutorial ([Getting live data from the StreamPipes data
stream](../3-getting-live-data-from-the-streampipes-data-stream)) showed how we
can connect to a data stream, and it would be possible to use Online Machine
Learning with this approach and train a model with the incoming events at the
`onEvent` method. However, the StreamPipes client also provides an easier way
to do this with the use of the [River library](https://riverml.xyz) for Online
Machine Learning. We will have [...]
]
},
{
@@ -222,12 +222,12 @@
"source": [
"## How to use Online Machine Learning with StreamPipes\n",
"After we configured the client as usual, we can start with the new part.
\n",
- "The approach is straight forward and you can start with the Machine
Learning in just 3 steps:\n",
+ "The approach is straight forward and you can start with the ML part in
just 3 steps:\n",
"1. Create a pipeline with River and insert the preprocessing steps and
model of your choice.\n",
"2. Configure the `OnlineML` wrapper to fit to your model and insert the
client and required data stream ids.\n",
"3. Start the wrapper and let the learning begin.\n",
"\n",
- "Now a StreamPipesFunction will be started and it trains the model for
every new event. It also creates an output data stream which will send the
prediction of the model back to StreamPipes. This output stream can be seen
when creating a new pipeline and can be used like every other data source. So
you can use it in a pipeline and save the predictions in a Data Lake.\n",
+ "A StreamPipesFunction is then started, which trains the model for each
new event. It also creates an output data stream which will send the prediction
of the model back to StreamPipes. This output stream can be seen when creating
a new pipeline and can be used like every other data source. So you can use it
in a pipeline and save the predictions in a Data Lake.\n",
"You can also stop and start the training with the method `set_learning`.
To stop the whole function use the `stop` methode and if you want to delete the
output stream entirely, you can go to the `Pipeline Element Installer` in
StreamPipes and uninstall it. \n",
" \n",
"Now let's take a look at some examples. If you want to execute the
examples below you have to create an adapter for the `Machine Data Simulator`,
select the `flowrate` sensor and insert the stream id of this stream."
diff --git a/streampipes-client-python/mkdocs.yml
b/streampipes-client-python/mkdocs.yml
index 640a5a2d9..1a078a629 100644
--- a/streampipes-client-python/mkdocs.yml
+++ b/streampipes-client-python/mkdocs.yml
@@ -15,9 +15,14 @@
# limitations under the License.
#
-site_name: Apache StreamPipes for Python
+site_name: Apache StreamPipes Python
+site_description: Python library for Apache StreamPipes
+site_author: Apache Software Foundation (ASF)
+site_url: https://streampipes.apache.org/docs/docs/python/latest/
+
+repo_name: apache/streampipes
repo_url: https://github.com/apache/streampipes
-edit_uri: streampipes-client-python/docs
+edit_uri: blob/dev/streampipes-client-python/docs
copyright: "Apache License 2.0"
@@ -87,14 +92,14 @@ extra_css:
- stylesheets/extra.css
nav:
- - Home: index.md
- - Getting Started:
+ - 🏡 Home: index.md
+ - 🚀 Getting Started:
- First Steps: getting-started/first-steps.md
- Developing & Contributing: getting-started/developing.md
- Quickstart: getting-started/quickstart.md
- - Examples:
- - Introduction to the StreamPipes Python Client:
examples/1-introduction-to-streampipes-python-client.ipynb
- - Extracting Data from the StreamPipes Data Lake:
examples/2-extracting-data-from-the-streampipes-data-lake.ipynb
- - Exploring Live Data from a StreamPipes Data Stream:
examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
- - Applying Online Machine Learning on a StreamPipes Data Stream:
examples/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
- - Reference: reference/*
+ - 🤓 Tutorials:
+ - Introduction to the StreamPipes Python Client:
tutorials/1-introduction-to-streampipes-python-client.ipynb
+ - Extracting Data from the StreamPipes Data Lake:
tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
+ - Exploring Live Data from a StreamPipes Data Stream:
tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+ - Applying Online Machine Learning on a StreamPipes Data Stream:
tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+ - 📚 Reference: reference/*
diff --git a/streampipes-client-python/setup.py
b/streampipes-client-python/setup.py
index 27c065114..3d555e904 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -29,6 +29,12 @@ EMAIL = "[email protected]"
AUTHOR = "Apache Software Foundation"
REQUIRES_PYTHON = ">=3.8.0"
+PROJECT_URLS = {
+ "Documentation": "https://streampipes.apache.org/docs/docs/python/latest/",
+ "Bug Tracker": "https://github.com/apache/streampipes/issues",
+ "Source Code": "https://github.com/apache/streampipes",
+}
+
# Package requirements.
base_packages = [
"pandas>=1.5.1",
@@ -90,6 +96,7 @@ setuptools.setup(
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url=URL,
+ project_urls=PROJECT_URLS,
packages=setuptools.find_packages(exclude=("tests",)),
install_requires=base_packages,
extras_require={
diff --git a/streampipes-client-python/streampipes/client/client.py
b/streampipes-client-python/streampipes/client/client.py
index 471696c1d..d98b2d5f4 100644
--- a/streampipes-client-python/streampipes/client/client.py
+++ b/streampipes-client-python/streampipes/client/client.py
@@ -37,53 +37,73 @@ logger = logging.getLogger(__name__)
class StreamPipesClient:
"""The client to connect to StreamPipes.
+
This is the central point of contact with StreamPipes and
provides all the functionalities to interact with it.
The client provides so-called "endpoints" each of which refers to
an endpoint of the StreamPipes API, e.g. `.dataLakeMeasureApi`.
- An endpoint provides the actual methods to interact with StreamPipes
- API (see endpoint.endpoint.APIEndpoint).
+ An [endpoint][streampipes.endpoint.endpoint] provides the actual methods
to interact with StreamPipes
+ API.
Parameters
----------
client_config: StreamPipesClientConfig
Configures the client to connect properly to the StreamPipes instance.
logging_level: Optional[int]
- Influences the log messages emitted by the `StreamPipesClient`.
+ Influences the log messages emitted by the `StreamPipesClient`
+
+ Attributes
+ ----------
+ dataLakeMeasureApi: DataLakeMeasureEndpoint
+ Instance of the data lake measure endpoint
+ dataStreamApi: DataStreamEndpoint
+ Instance of the data stream endpoint
Examples
--------
- >>> from streampipes.client import StreamPipesClient
- >>> from streampipes.client.config import StreamPipesClientConfig
- >>> from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
-
- >>> client_config = StreamPipesClientConfig(
- ... credential_provider=StreamPipesApiKeyCredentials(
- ... username="test-user",
- ... api_key="api-key"
- ... ),
- ... host_address="localhost",
- ... https_disabled=True
- ... )
+ ```python
+ from streampipes.client import StreamPipesClient
+ from streampipes.client.config import StreamPipesClientConfig
+ from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
+ ```
+
+ ```python
+ client_config = StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(
+ username="test-user",
+ api_key="api-key"
+ ),
+ host_address="localhost",
+ https_disabled=True
+ )
+ ```
The following way of instantiating a client instance is
intended to be consistent with the StreamPipes Java client.
- >>> client = StreamPipesClient.create(client_config=client_config)
+ ```python
+ client = StreamPipesClient.create(client_config=client_config)
+ ```
If you prefer a more pythonic way, you can simply write:
- >>> client = StreamPipesClient(client_config=client_config)
+ ```python
+ client = StreamPipesClient(client_config=client_config)
+ ```
To interact with an endpoint:
- >>> data_lake_measures = client.dataLakeMeasureApi.all()
+ ```python
+ data_lake_measures = client.dataLakeMeasureApi.all()
+ ```
To inspect returned data as a pandas dataframe:
- >>> data_lake_measures.to_pandas()
+ ```python
+ data_lake_measures.to_pandas()
#
# measure_name timestamp_field ... pipeline_is_running
num_event_properties
# 0 test s0::timestamp ... False
2
# [1 rows x 6 columns]
+ ```
"""
@@ -99,7 +119,8 @@ class StreamPipesClient:
self.request_session = Session()
self.request_session.headers.update(self.http_headers)
- self._set_up_logging(logging_level=logging_level) # type: ignore
+ self.logging_level = logging_level
+ self._set_up_logging(logging_level=self.logging_level) # type: ignore
# provide all available endpoints here
# name of the endpoint needs to be consistent with the Java client
@@ -134,7 +155,8 @@ class StreamPipesClient:
logging_level: int = logging.INFO,
) -> StreamPipesClient:
"""Returns an instance of the `StreamPipesPythonClient`.
- Provides consistency to the Java client.
+
+ Provides consistency to the StreamPipes Java client.
Parameters
----------
@@ -151,13 +173,15 @@ class StreamPipesClient:
@property
def http_headers(self) -> Dict[str, str]:
- """Returns the HTTP headers required for all requests.
+ """Returns the HTTP headers used for all requests.
+
The HTTP headers are composed of the authentication headers supplied
by the credential
provider and additional required headers (currently this is only the
application header).
Returns
-------
- Dictionary with header information as string key-value pairs.
+ http_headers: Dict[str, str]
+ header information for HTTP requests as string key-value pairs.
"""
# create HTTP headers from credential provider and add additional
headers needed
@@ -171,7 +195,8 @@ class StreamPipesClient:
Returns
-------
- str of the basic API URL
+ base_api_path: str
+ basic API path of the connected StreamPipes instance
"""
return (
f"{'http://' if self.client_config.https_disabled else 'https://'}"
@@ -180,11 +205,26 @@ class StreamPipesClient:
)
def describe(self) -> None:
- """Prints short description of the connected StreamPipes instance and
the available resources to the console.
+ """Prints a short description of the connected StreamPipes instance
and the available resources to the console.
Returns
-------
None
+
+ Examples
+ --------
+
+ ```python
+ client.describe()
+ ```
+ Output:
+ ```
+ Hi there!
+ You are connected to a StreamPipes instance running at
http://localhost:80.
+ The following StreamPipes resources are available with this client:
+ 6x DataStreams
+ 1x DataLakeMeasures
+ ```
"""
# get all endpoints of this client
diff --git a/streampipes-client-python/streampipes/client/config.py
b/streampipes-client-python/streampipes/client/config.py
index 05e3729b1..8149b3b0c 100644
--- a/streampipes-client-python/streampipes/client/config.py
+++ b/streampipes-client-python/streampipes/client/config.py
@@ -16,7 +16,7 @@
#
"""
-Implementation of a config class for the StreamPipes client.
+Configuration class for the StreamPipes client.
"""
@@ -34,6 +34,7 @@ from streampipes.client.credential_provider import
CredentialProvider
@dataclass
class StreamPipesClientConfig:
"""Configure the StreamPipes client in accordance to the actual
StreamPipes instance to connect to.
+
An instance is provided to the `StreamPipesClient` to configure it
properly.
Parameters
@@ -46,11 +47,14 @@ class StreamPipesClientConfig:
https_disabled: Optional[bool]
Determines whether https is used to connect to StreamPipes.
port: Optional[int]
- Specifies the port under which the StreamPipes API is available, e.g.,
`80` (with http) or `443` (with https)
+ Specifies the port under which the StreamPipes API is available,
+ e.g., `80` (with http) or `443` (with https)
Examples
--------
- see `StreamPipesClient`
+
+ see [StreamPipesClient][streampipes.client.StreamPipesClient]
+
"""
credential_provider: CredentialProvider
diff --git
a/streampipes-client-python/streampipes/client/credential_provider.py
b/streampipes-client-python/streampipes/client/credential_provider.py
index 015e192f8..a29598c9f 100644
--- a/streampipes-client-python/streampipes/client/credential_provider.py
+++ b/streampipes-client-python/streampipes/client/credential_provider.py
@@ -40,6 +40,7 @@ class CredentialProvider(ABC):
def make_headers(self, http_headers: Optional[Dict[str, str]] = None) ->
Dict[str, str]:
"""Creates the HTTP headers for the specific credential provider.
+
Concrete authentication headers must be defined in the implementation
of a credential provider.
Parameters
@@ -49,7 +50,10 @@ class CredentialProvider(ABC):
Returns
-------
- Dictionary with header information as string key-value pairs.
+ https_headers: Dict[str, str]
+ Dictionary with header information as string key-value pairs. <br>
+ Contains all pairs given as parameter plus the header pairs for
authentication
+ determined by the credential provider.
"""
if http_headers is None:
@@ -73,8 +77,9 @@ class CredentialProvider(ABC):
class StreamPipesApiKeyCredentials(CredentialProvider):
- """A Credential provider that allows authentication via a StreamPipes API
Token.
- This token can be generated via the StreamPipes UI (see how in the
project's README).
+ """A credential provider that allows authentication via a StreamPipes API
Token.
+
+ The required token can be generated via the StreamPipes UI (see the
description on our [start-page](../../../).
Parameters
----------
@@ -85,11 +90,8 @@ class StreamPipesApiKeyCredentials(CredentialProvider):
Examples
--------
- see `StreamPipesClient`
+ see [StreamPipesClient][streampipes.client.StreamPipesClient]
- References
- ----------
- [^1]: [StreamPipes Python Client
README](https://streampipes.apache.org/docs/docs/python/latest/)
"""
@classmethod
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 5f8929314..3566dba8d 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -17,7 +17,7 @@
"""
Specific implementation of the StreamPipes API's data lake measure endpoints.
-This endpoint allows to consume data stored in StreamPipes' data lake
+This endpoint allows to consume data stored in StreamPipes' data lake.
"""
from datetime import datetime
from typing import Any, Dict, Literal, Optional, Tuple, Type
@@ -40,11 +40,12 @@ class StreamPipesQueryValidationError(Exception):
class MeasurementGetQueryConfig(BaseModel):
- """Config class describing the parameters of the GET endpoint for
measurements.
+ """Config class describing the parameters of the `get()` method for
measurements.
This config class is used to validate the provided query parameters for
the GET endpoint of measurements.
Additionally, it takes care of the conversion to a proper HTTP query
string.
- Thereby, parameter names are adapted to the naming of the StreamPipes API,
for which Pydantic aliases are used.
+ Thereby, parameter names are adapted to the naming of the StreamPipes API,
+ for which [Pydantic
aliases](https://docs.pydantic.dev/usage/model_config/#options) are used.
Attributes
----------
@@ -160,40 +161,47 @@ class DataLakeMeasureEndpoint(APIEndpoint):
This endpoint provides an interfact to all data stored in the StreamPipes
data lake.
Consequently, it allows uerying metadata about available data sets (see
`all()` method).
- The metadata is returned as an instance of
`model.container.DataLakeMeasures`.
+ The metadata is returned as an instance of
[`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures].
In addition, the endpoint provides direct access to the data stored in the
data laka by querying a
specific data lake measure using the `get()` method.
- Parameters
- ----------
- parent_client: StreamPipesClient
- The instance of `client.StreamPipesClient` the endpoint is attached to.
-
Examples
--------
- >>> from streampipes.client import StreamPipesClient
- >>> from streampipes.client.config import StreamPipesClientConfig
- >>> from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
-
- >>> client_config = StreamPipesClientConfig(
- ...
credential_provider=StreamPipesApiKeyCredentials(username="test-user",
api_key="api-key"),
- ... host_address="localhost",
- ... port=8082,
- ... https_disabled=True
- ... )
-
- >>> client = StreamPipesClient.create(client_config=client_config)
-
- >>> data_lake_measures = client.dataLakeMeasureApi.all()
-
- >>> len(data_lake_measures)
+ ```python
+ from streampipes.client import StreamPipesClient
+ from streampipes.client.config import StreamPipesClientConfig
+ from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
+ ```
+
+ ```python
+ client_config = StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="test-user",
api_key="api-key"),
+ host_address="localhost",
+ port=8082,
+ https_disabled=True
+ )
+ client = StreamPipesClient.create(client_config=client_config)
+ ```
+
+ ```
+ # get all existing data lake measures from StreamPipes
+ data_lake_measures = client.dataLakeMeasureApi.all()
+
+ # let's take a look how many we got
+ len(data_lake_measures)
+ ```
+ ```
5
-
- Retrieve a specific data lake measure as a pandas DataFrame
- >>> flow_rate_pd =
client.dataLakeMeasureApi.get(identifier="flow-rate").to_pandas()
- >>> flow_rate_pd
+ ```
+
+ ```python
+ # Retrieve a specific data lake measure as a pandas DataFrame
+ flow_rate_pd =
client.dataLakeMeasureApi.get(identifier="flow-rate").to_pandas()
+ flow_rate_pd
+ ```
+ ```
time density mass_flow sensorId
sensor_fault_flags temperature volume_flow
0 2023-02-24T16:19:41.472Z 50.872730 3.309556 flowrate02
False 44.448483 5.793138
1 2023-02-24T16:19:41.482Z 47.186588 5.608580 flowrate02
False 40.322033 0.058015
@@ -206,20 +214,30 @@ class DataLakeMeasureEndpoint(APIEndpoint):
997 2023-02-24T16:19:52.952Z 45.837013 7.770180 flowrate02
False 48.188026 7.892062
998 2023-02-24T16:19:52.965Z 43.389065 4.458602 flowrate02
False 48.280899 5.733892
999 2023-02-24T16:19:52.977Z 44.056030 2.592060 flowrate02
False 47.505951 4.260697
+ ```
As you can see, the returned amount of rows per default is `1000`.
We can modify this behavior by passing the `limit` paramter.
- >>> flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
limit=10).to_pandas()
- >>> len(flow_rate_pd)
+ ```python
+ flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
limit=10).to_pandas()
+ len(flow_rate_pd)
+ ```
+ ```
+ 10
+ ```
If we are only interested in the values for `density`,
`columns` allows us to select the columns to be returned:
- >>> flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
columns='density', limit=3).to_pandas()
- >>> flow_rate_pd
+ ```python
+ flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
columns='density', limit=3).to_pandas()
+ flow_rate_pd
+ ```
+ ```
time density
0 2023-02-24T16:19:41.472Z 50.872730
1 2023-02-24T16:19:41.482Z 47.186588
2 2023-02-24T16:19:41.493Z 46.735321
+ ```
This is only a subset of the available query parameters,
find them at
[MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig].
@@ -269,23 +287,13 @@ class DataLakeMeasureEndpoint(APIEndpoint):
@property
def _container_cls(self) -> Type[ResourceContainer]:
- """Defines the model container class the endpoint refers to.
-
-
- Returns
- -------
- `model.container.DataLakeMeasures`
- """
+ """Defines the model container class the endpoint refers to."""
return DataLakeMeasures
@property
def _relative_api_path(self) -> Tuple[str, ...]:
"""Defines the relative api path to the DataLakeMeasurement endpoint.
Each path within the URL is defined as an own string.
-
- Returns
- -------
- A tuple of strings of which every represents a path value of the
endpoint's API URL.
"""
return "api", "v4", "datalake", "measurements"
@@ -294,7 +302,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
"""Queries the specified data lake measure from the API.
By default, the maximum number of returned records is 1000.
- This behaviour can be influences by passing the parameter `limit` with
a different value
+ This behaviour can be influenced by passing the parameter `limit` with
a different value
(see
[MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
Parameters
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_stream.py
b/streampipes-client-python/streampipes/endpoint/api/data_stream.py
index 4780d3ea4..ce5da71d2 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_stream.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_stream.py
@@ -34,33 +34,34 @@ class DataStreamEndpoint(APIEndpoint):
"""Implementation of the DataStream endpoint.
Consequently, it allows querying metadata about available data streams
(see `all()` method).
- The metadata is returned as an instance of `model.container.DataStreams`.
-
- Parameters
- ----------
- parent_client: StreamPipesClient
- The instance of `client.StreamPipesClient` the endpoint is attached to.
+ The metadata is returned as an instance of
[`DataStreams`][streampipes.model.container.DataStreams].
Examples
--------
- >>> from streampipes.client import StreamPipesClient
- >>> from streampipes.client.config import StreamPipesClientConfig
- >>> from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
-
- >>> client_config = StreamPipesClientConfig(
- ...
credential_provider=StreamPipesApiKeyCredentials(username="test-user",
api_key="api-key"),
- ... host_address="localhost",
- ... port=8082,
- ... https_disabled=True
- ... )
-
- >>> client = StreamPipesClient.create(client_config=client_config)
-
- >>> data_streams = client.DataStreamEndpoint.all()
-
- >>> len(data_streams)
+ ```python
+ from streampipes.client import StreamPipesClient
+ from streampipes.client.config import StreamPipesClientConfig
+ from streampipes.client.credential_provider import
StreamPipesApiKeyCredentials
+
+ client_config = StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="test-user",
api_key="api-key"),
+ host_address="localhost",
+ port=8082,
+ https_disabled=True
+ )
+ client = StreamPipesClient.create(client_config=client_config)
+ ```
+
+ ```python
+ # let's get all existing data streams in StreamPipes
+ data_streams = client.dataStreamApi.all()
+ len(data_streams)
+ ```
+ ```
2
+ ```
+
"""
@property
diff --git a/streampipes-client-python/streampipes/endpoint/endpoint.py
b/streampipes-client-python/streampipes/endpoint/endpoint.py
index 780a68f42..fa96df79a 100644
--- a/streampipes-client-python/streampipes/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes/endpoint/endpoint.py
@@ -18,7 +18,7 @@
"""
General implementation for an endpoint.
Provided classes and assets are aimed to be used for developing endpoints.
-An endpoint is provides all options to communicate with a central endpoint of
the StreamPipes API in a handy way.
+An endpoint provides all options to communicate with ad dedicated part of
StreamPipes in a handy way.
"""
import json
@@ -61,7 +61,7 @@ _error_code_to_message = {
class Endpoint(ABC):
- """Abstract implementation of an StreamPipes endpoint.
+ """Abstract implementation of a StreamPipes endpoint.
Serves as template for all endpoints used for interaction with a
StreamPipes instance.
By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
@@ -69,7 +69,7 @@ class Endpoint(ABC):
Parameters
----------
parent_client: StreamPipesClient
- This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
+ This parameter expects the instance of `StreamPipesClient` the
endpoint is attached to.
"""
@@ -80,13 +80,8 @@ class Endpoint(ABC):
class APIEndpoint(Endpoint):
"""Abstract implementation of an API endpoint.
- Serves as template for all endpoints for the StreamPipes API.
+ Serves as template for all endpoints of the StreamPipes API.
By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
-
- Parameters
- ----------
- parent_client: StreamPipesClient
- This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
"""
@property
@@ -169,22 +164,25 @@ class APIEndpoint(Endpoint):
return response
def build_url(self) -> str:
- """Creates the URL of the API path for the endpoint.
+ """Builds the endpoint's URL of the API path.
Returns
-------
- The URL of the Endpoint
+ url: str
+ The URL of the endpoint
"""
return f"{self._parent_client.base_api_path}" f"{'/'.join(api_path for
api_path in self._relative_api_path)}"
def all(self) -> ResourceContainer:
"""Get all resources of this endpoint provided by the StreamPipes API.
- Results are provided as an instance of a
`model.container.ResourceContainer` that
+
+ Results are provided as an instance of a `ResourceContainer` that
allows to handle the returned resources in a comfortable and pythonic
way.
Returns
-------
- A model container instance (`model.container.ResourceContainer`)
bundling the resources returned.
+ container: ResourceContainer
+ container element that bundles the returned resources
"""
response = self._make_request(
@@ -203,7 +201,8 @@ class APIEndpoint(Endpoint):
Returns
-------
- The specified resource as an instance of the corresponding model class
(`model.Resource`).
+ resource: Resource
+ The specified resource as an instance of the corresponding model
class.
"""
response = self._make_request(
@@ -213,7 +212,7 @@ class APIEndpoint(Endpoint):
return self._container_cls._resource_cls()(**response.json())
def post(self, resource: Resource) -> None:
- """Post a resource to the StreamPipes API.
+ """Allows to post a resource to the StreamPipes API.
Parameters
----------
@@ -235,15 +234,11 @@ class APIEndpoint(Endpoint):
class MessagingEndpoint(Endpoint):
"""Abstract implementation of a StreamPipes messaging endpoint.
+
Serves as template for all endpoints used for interacting with the
StreamPipes messaging layer directly.
Therefore, they need to provide the functionality to talk with the broker
system running in StreamPipes.
By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
- Parameters
- ----------
- parent_client: StreamPipesClient
- This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
-
"""
def __init__(self, parent_client: "StreamPipesClient"): # type: ignore #
noqa: F821
@@ -264,8 +259,8 @@ class MessagingEndpoint(Endpoint):
Returns
-------
- The broker instance to be used to communicate with
- StreamPipes' messaging layer.
+ broker: Broker
+ The broker instance to be used to communicate with StreamPipes'
messaging layer.
"""
if self._broker is not None:
@@ -286,8 +281,13 @@ class MessagingEndpoint(Endpoint):
This configuration step is required before the endpoint can be
actually used.
The based `broker` instance is passed to an internal property
+ Parameters
+ ----------
+ broker: Broker
+ Broker instance that should be used for this endpoint
+
Returns
- _______
+ -------
None
"""
diff --git
a/streampipes-client-python/streampipes/function_zoo/river_function.py
b/streampipes-client-python/streampipes/function_zoo/river_function.py
index ac142c989..b43b40d0c 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -30,7 +30,7 @@ from streampipes.model.resource.function_definition import
FunctionDefinition
class RiverFunction(StreamPipesFunction):
"""Implementation of a StreamPipesFunction to enable an easy usage
- for Online Machine Learning models of the River library.
+ for Online Machine Learning models of the [River
library](https://riverml.xyz/).
The function trains the model with the incoming events and publishes the
prediction to an output data stream.
@@ -78,15 +78,46 @@ class RiverFunction(StreamPipesFunction):
self.learning = True
def requiredStreamIds(self) -> List[str]:
- """Returns the the stream ids."""
+ """Returns the stream ids required by this function.
+
+ Returns
+ -------
+ stream_ids: List[str]
+ List of stream ids required by the function
+
+ """
return self.stream_ids
def onServiceStarted(self, context: FunctionContext):
- """Executes the `on_start` function."""
+ """Executes the `on_start` method of the function.
+
+ Parameters
+ ----------
+ context: FunctionContext
+ The functions' context
+
+ Returns
+ -------
+ None
+
+ """
self.on_start(self, context)
def onEvent(self, event: Dict[str, Any], streamId: str):
- """Trains the model with the incoming events and sends the prediction
back to StreamPipes."""
+ """Trains the model with the incoming events and sends the prediction
back to StreamPipes.
+
+ Parameters
+ ----------
+ event: Dict[str, Any]
+ The incoming event that serves as input for the function
+ streamId: str
+ Identifier of the corresponding data stream
+
+ Returns
+ -------
+ None
+
+ """
self.on_event(self, event, streamId)
output_event = {}
if self.supervised:
@@ -111,7 +142,7 @@ class OnlineML:
"""Wrapper class to enable an easy usage for Online Machine Learning
models of the River library.
It creates a StreamPipesFunction to train a model with the incoming events
of a data stream and
- creates an output data stream to publishes the prediction to StreamPipes.
+ creates an output data stream that publishes the prediction to StreamPipes.
Parameters
----------
@@ -180,5 +211,5 @@ class OnlineML:
self.sp_function.learning = learning
def stop(self):
- """Stops the function and ends the training for ever."""
+ """Stops the function and ends the training forever."""
self.function_handler.disconnect()
diff --git a/streampipes-client-python/streampipes/functions/broker/broker.py
b/streampipes-client-python/streampipes/functions/broker/broker.py
index 711bad65a..7697bda11 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker.py
@@ -23,20 +23,18 @@ from streampipes.model.resource.data_stream import
DataStream
class Broker(ABC):
"""Abstract implementation of a broker.
- A broker is used to subscribe to a data stream and to consume the
published events.
+
+ A broker allows both to subscribe to a data stream and to publish events
to a data stream.
"""
async def connect(self, data_stream: DataStream) -> None:
- """Connects the broker to a server.
+ """Connects to the broker running in StreamPipes.
Parameters
----------
- data_stream: DataStream
+ data_stream: DataStream
Contains the meta information (resources) for a data stream.
- host_address: str
- The host address of the server, which the broker connects to.
-
Returns
-------
None
@@ -55,10 +53,8 @@ class Broker(ABC):
Parameters
----------
-
hostname: str
- The hostname of the of the server, which the broker connects to.
-
+ The hostname of the server, which the broker connects to.
port: int
The port number of the connection.
@@ -79,12 +75,12 @@ class Broker(ABC):
raise NotImplementedError # pragma: no cover
@abstractmethod
- async def publish_event(self, event: Dict[str, Any]):
+ async def publish_event(self, event: Dict[str, Any]) -> None:
"""Publish an event to a connected data stream.
Parameters
----------
- event: Dict[str, Any]
+ event: Dict[str, Any]
The event to be published.
Returns
@@ -109,6 +105,7 @@ class Broker(ABC):
Returns
-------
- An async iterator for the messages.
+ iterator: AsyncIterator
+ An async iterator for the messages.
"""
raise NotImplementedError # pragma: no cover
diff --git
a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 3aac0f986..6bf070486 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -28,7 +28,7 @@ class SupportedBroker(Enum):
# TODO Exception should be removed once all brokers are implemented.
-class UnsupportedBroker(Exception):
+class UnsupportedBrokerError(Exception):
"""Exception if a broker isn't implemented yet."""
def __init__(self, message):
@@ -36,16 +36,22 @@ class UnsupportedBroker(Exception):
def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for
more transport_protocols
- """Get a broker by a name.
+ """Derive the broker for the given data stream.
Parameters
----------
- broker_name: str
- A string that represents a broker.
+ data_stream: DataStream
+ Data stream instance from which the broker is inferred
Returns
-------
- The broker which belongs to the name.
+ broker: Broker
+ The corresponding broker instance derived from data stream.
+
+ Raises
+ ------
+ UnsupportedBrokerError
+ Is raised when the given data stream belongs to a broker that is
currently not supported by StreamPipes Python.
"""
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
@@ -53,4 +59,4 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO
implementation for mo
elif SupportedBroker.KAFKA.value in broker_name:
return KafkaBroker()
else:
- raise UnsupportedBroker(f'The python client doesn\'t include the
broker: "{broker_name}" yet')
+ raise UnsupportedBrokerError(f'The python client doesn\'t include the
broker: "{broker_name}" yet')
diff --git
a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
index 900d7f6c1..9bc1337a2 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
class KafkaBroker(Broker):
- """Implementation of the NatsBroker"""
+ """Implementation of the broker for Kafka"""
async def _makeConnection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
@@ -36,7 +36,7 @@ class KafkaBroker(Broker):
----------
hostname: str
- The hostname of the of the server, which the broker connects to.
+ The hostname of the server, which the broker connects to.
port: int
The port number of the connection.
@@ -65,7 +65,7 @@ class KafkaBroker(Broker):
Parameters
----------
- event: Dict[str, Any]
+ event: Dict[str, Any]
The event to be published.
Returns
@@ -90,7 +90,8 @@ class KafkaBroker(Broker):
Returns
-------
- An async iterator for the messages.
+ iterator: AsyncIterator
+ An async iterator for the messages.
"""
return KafkaMessageFetcher(self.kafka_consumer)
diff --git
a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
index 56bbf46c9..d8e9fc3d2 100644
---
a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
+++
b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
@@ -17,12 +17,12 @@
from confluent_kafka import Consumer # type: ignore
-class KafkaMsg:
+class KafkaMessage:
"""An internal representation of a Kafka message
Parameters
----------
- data: Byte Array
+ data: bytes
The received Kafka message as byte array
"""
@@ -47,4 +47,4 @@ class KafkaMessageFetcher:
async def __anext__(self):
msg = self.consumer.poll(0.1)
- return KafkaMsg(msg.value())
+ return KafkaMessage(msg.value())
diff --git
a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
b/streampipes-client-python/streampipes/functions/broker/nats_broker.py
index a1a00a99c..238e14bfd 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats_broker.py
@@ -34,7 +34,7 @@ class NatsBroker(Broker):
----------
hostname: str
- The hostname of the of the server, which the broker connects to.
+ The hostname of the server, which the broker connects to.
port: int
The port number of the connection.
@@ -42,6 +42,7 @@ class NatsBroker(Broker):
Returns
-------
None
+
"""
self.nats_client = await connect(f"nats://{hostname}:{port}")
if self.nats_client.connected_url is not None:
@@ -53,6 +54,7 @@ class NatsBroker(Broker):
Returns
-------
None
+
"""
self.subscription = await self.nats_client.subscribe(self.topic_name)
logger.info(f"Subscribed to stream: {self.stream_id}")
@@ -62,12 +64,13 @@ class NatsBroker(Broker):
Parameters
----------
- event: Dict[str, Any]
+ event: Dict[str, Any]
The event to be published.
Returns
-------
None
+
"""
await self.nats_client.publish(subject=self.topic_name,
payload=json.dumps(event).encode("utf-8"))
@@ -86,6 +89,7 @@ class NatsBroker(Broker):
Returns
-------
- An async iterator for the messages.
+ message_iterator: AsyncIterator
+ An async iterator for the messages.
"""
return self.subscription.messages
diff --git
a/streampipes-client-python/streampipes/functions/broker/output_collector.py
b/streampipes-client-python/streampipes/functions/broker/output_collector.py
index 4cd7afe89..fcae7d55e 100644
--- a/streampipes-client-python/streampipes/functions/broker/output_collector.py
+++ b/streampipes-client-python/streampipes/functions/broker/output_collector.py
@@ -22,13 +22,19 @@ from streampipes.model.resource.data_stream import
DataStream
class OutputCollector:
- """Collector for output events.The events are published to an output data
stream.
- Therefore the output collector establishes a connection to the broker.
+ """Collector for output events. The events are published to an output data
stream.
+ Therefore, the output collector establishes a connection to the broker.
Parameters
----------
- data_stream: DataStream
- The output data stream that will receive the events.
+ data_stream: DataStream
+ The output data stream that will receive the events.
+
+ Attributes
+ ----------
+ broker: Broker
+ The broker instance that sends the data to StreamPipes
+
"""
def __init__(self, data_stream: DataStream) -> None:
@@ -42,14 +48,24 @@ class OutputCollector:
----------
event: Dict[str, Any]
The event to be published.
+
+ Returns
+ -------
+ None
"""
self._run_coroutine(self.broker.publish_event(event))
def disconnect(self) -> None:
- """Disconnects the broker of the output collector."""
+ """Disconnects the broker of the output collector.
+
+ Returns
+ -------
+ None
+ """
self._run_coroutine(self.broker.disconnect())
- def _run_coroutine(self, coroutine: Coroutine) -> None:
+ @staticmethod
+ def _run_coroutine(coroutine: Coroutine) -> None:
"""Run a coroutine in the event loop or create a new one if there is a
running event loop.
Parameters
diff --git
a/streampipes-client-python/streampipes/functions/function_handler.py
b/streampipes-client-python/streampipes/functions/function_handler.py
index 0234ce0f2..83b88669b 100644
--- a/streampipes-client-python/streampipes/functions/function_handler.py
+++ b/streampipes-client-python/streampipes/functions/function_handler.py
@@ -31,7 +31,8 @@ logger = logging.getLogger(__name__)
class FunctionHandler:
- """The function handler manages the StreamPipesFunctions.
+ """The function handler manages the StreamPipes Functions.
+
It controls the connection to the brokers, starts the functions, manages
the broadcast of the live data
and is able to stop the connection to the brokers and functions.
@@ -41,6 +42,13 @@ class FunctionHandler:
The registration, that contains the StreamPipesFunctions.
client: StreamPipesClient
The client to interact with the API.
+
+ Attributes
+ ----------
+ stream_contexts: Dict[str, DataStreamContext]
+ Map of all data stream contexts
+ brokers: List[Broker]
+ List of all registered brokers
"""
def __init__(self, registration: Registration, client: StreamPipesClient)
-> None:
@@ -50,7 +58,7 @@ class FunctionHandler:
self.brokers: List[Broker] = []
def initializeFunctions(self) -> None:
- """Creates the context for every data stream and starts the event loop
to manage the StreamPipesFunctions.
+ """Creates the context for every data stream and starts the event loop
to manage the StreamPipes Functions.
Returns
-------
@@ -149,7 +157,7 @@ class FunctionHandler:
-------
None
- Raises
+ Warns
------
UserWarning
If there is a running event loop and the functions should be
stopped by disconnecting from the broker.
@@ -164,7 +172,7 @@ class FunctionHandler:
)
def disconnect(self) -> None:
- """Disconnects from the brokers and stops the functions.
+ """Disconnects from the brokers and stops all functions.
Returns
-------
diff --git a/streampipes-client-python/streampipes/functions/registration.py
b/streampipes-client-python/streampipes/functions/registration.py
index 8d3d05934..e77bd8ad8 100644
--- a/streampipes-client-python/streampipes/functions/registration.py
+++ b/streampipes-client-python/streampipes/functions/registration.py
@@ -20,7 +20,14 @@ from streampipes.functions.streampipes_function import
StreamPipesFunction
class Registration:
- """Manages the existing StreamPipesFunctions and registers them."""
+ """Manages the existing StreamPipesFunctions and registers them.
+
+ Attributes
+ ----------
+ functions: List[StreamPipesFunction]
+ List of all registered StreamPipesFunction
+
+ """
def __init__(self) -> None:
self.functions: List[StreamPipesFunction] = []
@@ -35,7 +42,8 @@ class Registration:
Returns
-------
- Registration
+ self: Registration
+ The updated Registration instance
"""
self.functions.append(streampipes_function) # TODO register function
to AdminAPI + consul
return self
@@ -43,8 +51,11 @@ class Registration:
def getFunctions(self) -> List[StreamPipesFunction]:
"""Get all registered functions.
+ This method exists to be consistent with the Java client.
+
Returns
-------
- List of the functions.
+ functions: List[StreamPipesFunction]
+ List of all registered functions.
"""
return self.functions
diff --git
a/streampipes-client-python/streampipes/functions/streampipes_function.py
b/streampipes-client-python/streampipes/functions/streampipes_function.py
index b6b62e38b..bfb267c4a 100644
--- a/streampipes-client-python/streampipes/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes/functions/streampipes_function.py
@@ -21,6 +21,7 @@ from typing import Any, Dict, List, Optional
from streampipes.functions.broker.output_collector import OutputCollector
from streampipes.functions.utils.function_context import FunctionContext
from streampipes.model.resource import FunctionDefinition
+from streampipes.model.resource.function_definition import FunctionId
class StreamPipesFunction(ABC):
@@ -34,6 +35,11 @@ class StreamPipesFunction(ABC):
----------
function_definition: FunctionDefinition
the definition of the function that contains metadata about the
connected function
+
+ Attributes
+ ----------
+ output_collectors: Dict[str, OutputCollector]
+ List of all output collectors which are created based on the provided
function definitions.
"""
def __init__(self, function_definition: Optional[FunctionDefinition] =
None):
@@ -51,17 +57,21 @@ class StreamPipesFunction(ABC):
stream_id: str
The id of the output data stream
event: Dict[str, Any]
- The event which should be sended
+ The event which should be sent
+
+ Returns
+ -------
+ None
"""
event["timestamp"] = int(1000 * time())
self.output_collectors[stream_id].collect(event)
- def getFunctionId(self) -> FunctionDefinition.FunctionId:
+ def getFunctionId(self) -> FunctionId:
"""Returns the id of the function.
Returns
-------
- FunctionId: FunctionDefinition.FunctionId
+ function_id: FunctionId
Identification object of the StreamPipes function
"""
return self.function_definition.function_id
@@ -79,7 +89,8 @@ class StreamPipesFunction(ABC):
Returns
-------
- List of the stream ids
+ stream_ids: List[str]
+ List of the stream ids
"""
raise NotImplementedError # pragma: no cover
diff --git
a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
index 589ff351d..2b69dfd6d 100644
---
a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
+++
b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
@@ -19,24 +19,23 @@ from typing import Any, AsyncGenerator, AsyncIterator,
Dict, Tuple
class AsyncIterHandler:
- """Handels asyncrone iterators to get every message after an other in
parallel."""
+ """Handles asynchronous iterators to get every message after another in
parallel."""
@staticmethod
async def anext(stream_id: str, message: AsyncIterator) -> Tuple[str, Any]:
- """Gets the next message from an AsncIterator.
+ """Gets the next message from an AsyncIterator.
Parameters
----------
-
stream_id: str
The id of the data stream which the message belongs to.
-
message: AsyncIterator
- An asyncrone iterator that contains the messages.
+ An asynchronous iterator that contains the messages.
Returns
-------
- Tuple of the stream id und next message or ("stop", None) if no
message is left.
+ result: Tuple[str, Optional[Any]]
+ Tuple of the stream id und next message or `("stop", None)` if no
message is left.
"""
try:
return stream_id, await message.__anext__()
@@ -45,17 +44,17 @@ class AsyncIterHandler:
@staticmethod
async def combine_async_messages(messages: Dict[str, AsyncIterator]) ->
AsyncGenerator:
- """Continuously gets the next published message from multiple
AsncIterators in parallel.
+ """Continuously gets the next published message from multiple
AsyncIterators in parallel.
Parameters
----------
-
messages: Dict[str, AsyncIterator]
- A dictonary with an asyncrone iterator for every stream id.
+ A dictionary with an asynchronous iterator for every stream id.
- Returns
- -------
- Generator that returns all recieved messages continuously.
+ Yields
+ ------
+ message: Tuple[str, Any]
+ Description of the anonymous integer return value.
"""
pending = {AsyncIterHandler.anext(stream_id, message) for stream_id,
message in messages.items()}
while pending:
diff --git
a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
index 7e5d39b72..2132b5071 100644
---
a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
+++
b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
@@ -27,7 +27,7 @@ class DataStreamContext:
Parameters
----------
functions: List[StreamPipesFunction]
- StreamPipesFunctions which require the data of this data stream.
+ StreamPipes Functions which require the data of this data stream.
schema: DataStream
The schema of this data stream.
broker: Broker
@@ -40,7 +40,7 @@ class DataStreamContext:
self.broker = broker
def add_function(self, function: StreamPipesFunction):
- """Adds a new StreamPipesFunction.
+ """Adds a new StreamPipes Function.
Parameters
----------
diff --git
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index e11657e44..3387b5826 100644
---
a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++
b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -23,7 +23,17 @@ from streampipes.model.resource.data_stream import DataStream
class RuntimeType(Enum):
- """Runtime types for the attributes of a data stream."""
+ """Runtime type names for the attributes of a data stream.
+
+ Attributes
+ ----------
+ STRING
+ BOOLEAN
+ DOUBLE
+ FLOAT
+ INTEGER
+ LONG
+ """
STRING = "string"
BOOLEAN = "boolean"
@@ -45,6 +55,11 @@ def create_data_stream(name: str, attributes: Dict[str,
str], stream_id: Optiona
Name and types of the attributes.
stream_id: str
The id of this data stream.
+
+ Returns
+ -------
+ data_stream: DataStream
+ The created data stream
"""
event_schema = EventSchema(
diff --git
a/streampipes-client-python/streampipes/functions/utils/function_context.py
b/streampipes-client-python/streampipes/functions/utils/function_context.py
index 067047688..a4ef37141 100644
--- a/streampipes-client-python/streampipes/functions/utils/function_context.py
+++ b/streampipes-client-python/streampipes/functions/utils/function_context.py
@@ -28,22 +28,20 @@ class FunctionContext:
function_id: str
The id of this function.
schema: Dict[str, DataStream]
- A dictonary which contains the schema of a data stream for each stream
id.
+ A dictionary which contains the schema of a data stream for each
stream id.
client: StreamPipesClient
The client to interact with the API.
streams: List[str]
The ids of the streams needed by this function.
"""
- def __init__(
- self, function_id: str, schema: Dict[str, DataStream], client:
StreamPipesClient, streams: List[str]
- ) -> None:
+ def __init__(self, function_id: str, schema: Dict[str, DataStream],
client: StreamPipesClient, streams: List[str]):
self.function_id = function_id
self.schema = schema
self.client = client
self.streams = streams
- def add_data_stream_schema(self, stream_id: str, data_stream: DataStream):
+ def add_data_stream_schema(self, stream_id: str, data_stream: DataStream)
-> None:
"""Adds a new data stream for a new stream id.
Parameters
@@ -56,5 +54,6 @@ class FunctionContext:
Returns
-------
None
+
"""
self.schema[stream_id] = data_stream
diff --git a/streampipes-client-python/streampipes/model/common.py
b/streampipes-client-python/streampipes/model/common.py
index a6c21aa84..c947b037f 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -33,19 +33,36 @@ __all__ = [
]
-def random_letters(n: int):
- """Generates n random letters.
+def random_letters(n: int) -> str:
+ """Generates a string consisting of random letters.
Parameters
----------
n: int
number of letters
+
+ Returns
+ -------
+ rand_str: str
+ String consisting of `n` random letters
"""
return "".join(random.choice(string.ascii_letters) for _ in range(n))
def _snake_to_camel_case(snake_case_string: str) -> str:
- """Converts a string in snake_case format to camelCase style."""
+ """Converts a string in snake_case format to camelCase style.
+
+ Parameters
+ ----------
+ snake_case_string: str
+ string in snake_case format
+
+ Returns
+ -------
+ camel_case: str
+ The exact same string formatted as camelCase
+
+ """
tokens = snake_case_string.split("_")
diff --git
a/streampipes-client-python/streampipes/model/container/data_lake_measures.py
b/streampipes-client-python/streampipes/model/container/data_lake_measures.py
index 6a6af7511..8cb5bd159 100644
---
a/streampipes-client-python/streampipes/model/container/data_lake_measures.py
+++
b/streampipes-client-python/streampipes/model/container/data_lake_measures.py
@@ -31,15 +31,11 @@ __all__ = [
class DataLakeMeasures(ResourceContainer):
"""Implementation of the resource container for the data lake measures
endpoint.
+
This resource container is a collection of data lake measures returned by
the StreamPipes API.
It is capable of parsing the response content directly into a list of
queried `DataLakeMeasure`.
Furthermore, the resource container makes them accessible in a pythonic
manner.
- Parameters
- ----------
- resources: List[DataLakeMeasure]
- A list of resources (`model.resource.DataLakeMeasure`) to be contained
in the `ResourceContainer`.
-
"""
@classmethod
@@ -48,6 +44,7 @@ class DataLakeMeasures(ResourceContainer):
Returns
-------
- DataLakeMeasure
+ type: DataLakeMeasure
+ class that describes an individual resource
"""
return DataLakeMeasure
diff --git
a/streampipes-client-python/streampipes/model/container/data_streams.py
b/streampipes-client-python/streampipes/model/container/data_streams.py
index da68f7bef..3112b7599 100644
--- a/streampipes-client-python/streampipes/model/container/data_streams.py
+++ b/streampipes-client-python/streampipes/model/container/data_streams.py
@@ -31,15 +31,11 @@ __all__ = [
class DataStreams(ResourceContainer):
"""Implementation of the resource container for the data stream endpoint.
+
This resource container is a collection of data streams returned by the
StreamPipes API.
It is capable of parsing the response content directly into a list of
queried `DataStream`.
Furthermore, the resource container makes them accessible in a pythonic
manner.
- Parameters
- ----------
- resources: List[DataStream]
- A list of resources (`model.resource.DataStream`) to be contained in
the `ResourceContainer`.
-
"""
@classmethod
@@ -48,6 +44,7 @@ class DataStreams(ResourceContainer):
Returns
-------
- DataStream
+ type: DataStream
+ class that defines the contained resource
"""
return DataStream
diff --git
a/streampipes-client-python/streampipes/model/container/resource_container.py
b/streampipes-client-python/streampipes/model/container/resource_container.py
index 331caeda9..27016896e 100644
---
a/streampipes-client-python/streampipes/model/container/resource_container.py
+++
b/streampipes-client-python/streampipes/model/container/resource_container.py
@@ -17,6 +17,7 @@
"""
General and abstract implementation for a resource container.
+
A resource container is a collection of resources returned by the StreamPipes
API.
It is capable of parsing the response content directly into a list of queried
resources.
Furthermore, the resource container makes them accessible in a pythonic manner.
@@ -116,6 +117,7 @@ class StreamPipesResourceContainerJSONError(Exception):
class ResourceContainer(ABC):
"""General and abstract implementation for a resource container.
+
A resource container is a collection of resources returned by the
StreamPipes API.
It is capable of parsing the response content directly into a list of
queried resources.
Furthermore, the resource container makes them accessible in a pythonic
manner.
@@ -123,7 +125,7 @@ class ResourceContainer(ABC):
Parameters
----------
resources: List[Resource]
- A list of resources (`model.resource.Resource`) to be contained in the
`ResourceContainer`.
+ A list of resources to be contained in the `ResourceContainer`.
"""
@@ -147,7 +149,8 @@ class ResourceContainer(ABC):
Returns
-------
- model.resource.Resource
+ cls: Resource
+ class that defines the resource type contained by the container
"""
raise NotImplementedError # pragma: no cover
@@ -162,7 +165,8 @@ class ResourceContainer(ABC):
Returns
-------
- ResourceContainer
+ container: ResourceContainer
+ instance of the container derived from the JSON definition
Raises
------
@@ -209,7 +213,7 @@ class ResourceContainer(ABC):
Returns
-------
- JSON string: str
+ json_string: str
JSON representation of the resource container where key names are
equal to
keys used in the StreamPipes backend
"""
@@ -222,6 +226,7 @@ class ResourceContainer(ABC):
Returns
-------
resource_container_df: pd.DataFrame
+ Representation of the resource container as pandas DataFrame
"""
return pd.DataFrame.from_records(
# ResourceContainer is iterable itself via __get_item__
diff --git
a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
index d72732afa..3d014ff92 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
@@ -27,19 +27,29 @@ __all__ = [
class DataLakeMeasure(Resource):
"""Implementation of a resource for data lake measures.
+
This resource defines the data model used by resource container
(`model.container.DataLakeMeasures`).
It inherits from Pydantic's BaseModel to get all its superpowers,
- which are used to parse, validate the API response and to easily switch
between
+ which are used to parse, validate the API response, and to easily switch
between
the Python representation (both serialized and deserialized) and Java
representation (serialized only).
"""
def convert_to_pandas_representation(self):
"""Returns the dictionary representation of a data lake measure
to be used when creating a pandas Dataframe.
+
It excludes the following fields: `element_id`, `event_schema`,
`schema_version`.
Instead of the whole event schema the number of event properties
contained
is returned with the column name `num_event_properties`.
+
+ Returns
+ -------
+ pandas_repr: Dict[str, Any]
+ Pandas representation of the resource as a dictionary, which is
then used by the respource container
+ to create a data frame from a collection of resources.
+
"""
+
return {
**self.dict(exclude={"element_id", "event_schema",
"schema_version"}),
"num_event_properties": len(self.event_schema.event_properties),
diff --git
a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
index 7d4ab8dc7..a09026ad1 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
@@ -48,9 +48,11 @@ class DataLakeSeries(Resource):
which are used to parse, validate the API response and to easily switch
between
the Python representation (both serialized and deserialized) and Java
representation (serialized only).
- NOTE:
+ Notes
+ ------
This class will only exist temporarily it its current appearance since
there are some inconsistencies in the StreamPipes API.
+
"""
@classmethod
@@ -76,6 +78,7 @@ class DataLakeSeries(Resource):
StreamPipesUnsupportedDataLakeSeries
If the data lake series returned by the StreamPipes API cannot be
parsed
with the current version of the Python client.
+
"""
# deserialize JSON string
@@ -99,7 +102,7 @@ class DataLakeSeries(Resource):
Returns
-------
- Dictionary
+ pandas_repr: dict[str, Any]
Dictionary with the keys `headers` and `rows`
"""
@@ -115,7 +118,8 @@ class DataLakeSeries(Resource):
Returns
-------
- pd.DataFrame
+ pd: pd.DataFrame
+ The data lake series in form of a pandas dataframe
"""
pandas_representation = self.convert_to_pandas_representation()
diff --git
a/streampipes-client-python/streampipes/model/resource/data_stream.py
b/streampipes-client-python/streampipes/model/resource/data_stream.py
index d053eb574..6eadc0318 100644
--- a/streampipes-client-python/streampipes/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes/model/resource/data_stream.py
@@ -34,10 +34,12 @@ __all__ = [
class DataStream(Resource):
"""Implementation of a resource for data streams.
+
This resource defines the data model used by resource container
(`model.container.DataStreams`).
It inherits from Pydantic's BaseModel to get all its superpowers,
which are used to parse, validate the API response and to easily switch
between
the Python representation (both serialized and deserialized) and Java
representation (serialized only).
+
"""
def convert_to_pandas_representation(self):
@@ -45,7 +47,10 @@ class DataStream(Resource):
Returns
-------
- Dictionary
+ pandas_repr: Dict[str, Any]
+ Pandas representation of the resource as a dictionary, which is
then used by the respource container
+ to create a data frame from a collection of resources.
+
"""
return {
**self.dict(
diff --git
a/streampipes-client-python/streampipes/model/resource/function_definition.py
b/streampipes-client-python/streampipes/model/resource/function_definition.py
index 53f079ea7..bc09167d5 100644
---
a/streampipes-client-python/streampipes/model/resource/function_definition.py
+++
b/streampipes-client-python/streampipes/model/resource/function_definition.py
@@ -28,6 +28,27 @@ from streampipes.model.resource.data_stream import DataStream
from streampipes.model.resource.resource import Resource
+class FunctionId(BasicModel):
+ """Identification object for a StreamPipes function.
+
+ Maps to the `FunctionId` class defined in the StreamPipes model.
+
+ Parameters
+ ----------
+ id: str
+ unique identifier of the function instance
+ version: int
+ version of the corresponding function
+
+ """
+
+ id: StrictStr = Field(default_factory=lambda: str(uuid4()))
+ version: StrictInt = Field(default=1)
+
+ def __hash__(self):
+ return hash((self.id, self.version))
+
+
class FunctionDefinition(Resource):
"""Configuration for a StreamPipes Function.
@@ -36,15 +57,31 @@ class FunctionDefinition(Resource):
Parameters
----------
+ consumed_streams: List[str]
+ List of data streams the function is consuming from
function_id: FunctionId
identifier object of a StreamPipes function
- consumed_streams: List[str]
- list of data streams the function is consuming from
+
+ Attributes
+ ----------
+ output_data_streams: Dict[str, DataStream]
+ Map off all output data streams added to the function definition
+
"""
+ function_id: FunctionId = Field(default_factory=FunctionId)
+ consumed_streams: List[str] = Field(default_factory=list)
+ output_data_streams: Dict[str, DataStream] = Field(default_factory=dict)
+
def convert_to_pandas_representation(self) -> Dict:
"""Returns the dictionary representation of a function definition
to be used when creating a pandas Dataframe.
+
+ Returns
+ -------
+ pandas_repr: Dict[str, Any]
+ Pandas representation of the resource as a dictionary, which is
then used by the respource container
+ to create a data frame from a collection of resources.
"""
return self.to_dict(use_source_names=False)
@@ -56,7 +93,14 @@ class FunctionDefinition(Resource):
----------
data_stream: DataStream
The schema of the output data stream.
+
+ Returns
+ -------
+ self: FunctionDefinition
+ Instance of the function definition that is extended by the
provided `DataStream`
+
"""
+
self.output_data_streams[data_stream.element_id] = data_stream
return self
@@ -65,8 +109,11 @@ class FunctionDefinition(Resource):
Returns
-------
- Dictonary with every stream id and the related output stream.
+ output_streams: Dict[str, DataStream]
+ Dictionary with every known stream id and the related output
stream.
+
"""
+
return self.output_data_streams
def get_output_stream_ids(self) -> List[str]:
@@ -74,29 +121,9 @@ class FunctionDefinition(Resource):
Returns
-------
- List of all stream ids.
- """
- return list(self.output_data_streams.keys())
+ output_stream_ids: List[str]
+ List of all stream ids
- class FunctionId(BasicModel):
- """Identification object for a StreamPipes function.
-
- Maps to the `FunctionId` class defined in the StreamPipes model.
-
- Parameters
- ----------
- id: str
- unique identifier of the function instance
- version: int
- version of the corresponding function
"""
- id: StrictStr = Field(default_factory=lambda: str(uuid4()))
- version: StrictInt = Field(default=1)
-
- def __hash__(self):
- return hash((self.id, self.version))
-
- function_id: FunctionId = Field(default_factory=FunctionId)
- consumed_streams: List[str] = Field(default_factory=list)
- output_data_streams: Dict[str, DataStream] = Field(default_factory=dict)
+ return list(self.output_data_streams.keys())
diff --git a/streampipes-client-python/streampipes/model/resource/resource.py
b/streampipes-client-python/streampipes/model/resource/resource.py
index 0fef1d6be..ab1faca88 100644
--- a/streampipes-client-python/streampipes/model/resource/resource.py
+++ b/streampipes-client-python/streampipes/model/resource/resource.py
@@ -17,6 +17,7 @@
"""
General and abstract implementation for a resource.
+
A resource defines the data model that is used by a resource container
(`model.container.resourceContainer`).
"""
from abc import ABC, abstractmethod
@@ -36,11 +37,19 @@ class Resource(ABC, BasicModel):
It inherits from Pydantic's BaseModel to get all its superpowers,
which are used to parse, validate the API response and to easily switch
between
the Python representation (both serialized and deserialized) and Java
representation (serialized only).
+
"""
@abstractmethod
def convert_to_pandas_representation(self) -> Dict:
- """Returns a dictionary representation to be used when creating a
pandas Dataframe."""
+ """Returns a dictionary representation to be used when creating a
pandas Dataframe.
+
+ Returns
+ -------
+ pandas_repr: Dict[str, Any]
+ Pandas representation of the resource as a dictionary, which is
then used by the respource container
+ to create a data frame from a collection of resources.
+ """
raise NotImplementedError # pragma: no cover
def to_dict(self, use_source_names=True):