This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
new 2303a0b16b Adjust python client
2303a0b16b is described below
commit 2303a0b16bd0bbd882b54fe2e7a77afd3b05b730
Author: Sven Oehler <[email protected]>
AuthorDate: Mon Aug 18 18:07:04 2025 +0200
Adjust python client
---
.../streampipes/endpoint/api/data_lake_measure.py | 18 ++++++--
.../streampipes/model/resource/query_result.py | 49 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 4 deletions(-)
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 1ef107b1e7..2be1aa0dc1 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -20,6 +20,7 @@ Specific implementation of the StreamPipes API's data lake
measure endpoints.
This endpoint allows to consume data stored in StreamPipes' data lake.
"""
from datetime import datetime
+from json import dumps
from typing import Any, Dict, List, Literal, Optional, Tuple, Type
from pydantic.v1 import BaseModel, Extra, Field, StrictInt, ValidationError,
validator
@@ -199,7 +200,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
Consequently, it allows querying metadata about available data sets (see
`all()` method).
The metadata is returned as an instance of
[`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures].
- In addition, the endpoint provides direct access to the data stored in the
data laka by querying a
+ In addition, the endpoint provides direct access to the data stored in the
data lake by querying a
specific data lake measure using the `get()` method.
Examples
@@ -253,7 +254,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
```
As you can see, the returned amount of rows per default is `1000`.
- We can modify this behavior by passing the `limit` paramter.
+ We can modify this behavior by passing the `limit` parameter.
```python
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
limit=10).to_pandas()
len(flow_rate_pd)
@@ -347,7 +348,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
"""Queries the specified data lake measure from the API.
By default, the maximum number of returned records is 1000.
- This behaviour can be influenced by passing the parameter `limit` with
a different value
+ This behavior can be influenced by passing the parameter `limit` with
a different value
(see
[MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
Parameters
@@ -369,7 +370,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
see directly at
[DataLakeMeasureEndpoint][streampipes.endpoint.api.data_lake_measure.DataLakeMeasureEndpoint].
"""
- # bild base URL for resource
+ # build base URL for resource
url = f"{self.build_url()}/{identifier}"
# extend base URL by query parameters
@@ -378,3 +379,12 @@ class DataLakeMeasureEndpoint(APIEndpoint):
response =
self._make_request(request_method=self._parent_client.request_session.get,
url=url)
return self._resource_cls(**response.json())
+
+ def post(self, identifier: str, query_result: QueryResult) -> None:
+ self._make_request(
+ request_method=self._parent_client.request_session.post,
+ url=f"{self.build_url()}/{identifier}",
+ data=dumps(query_result.to_dict(use_source_names=True)),
+ headers={"Content-type": "application/json"},
+ )
+
diff --git
a/streampipes-client-python/streampipes/model/resource/query_result.py
b/streampipes-client-python/streampipes/model/resource/query_result.py
index fbd613e9ce..47a89aeee9 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -91,3 +91,52 @@ class QueryResult(Resource):
df = pd.DataFrame(data=pandas_representation["rows"],
columns=pandas_representation["headers"])
return df
+
+ @classmethod
+ def from_pandas(
+ cls,
+ df: pd.DataFrame,
+ source_index: int = 0,
+ for_id: Optional[str] = None,
+ query_status: Literal["OK", "TOO_MUCH_DATA"] = "OK",
+ ) -> "QueryResult":
+ """Create a QueryResult object from a pandas DataFrame.
+
+ Parameters
+ ----------
+ df : pd.DataFrame
+ The DataFrame to convert into a QueryResult.
+ source_index : int, optional
+ Source index of the query result (default is 0).
+ for_id : str, optional
+ Optional identifier for the query result.
+ query_status : Literal["OK", "TOO_MUCH_DATA"], optional
+ Query status to set in the QueryResult (default is "OK").
+
+ Returns
+ -------
+ QueryResult
+ A QueryResult object representing the DataFrame.
+ """
+ if df.empty:
+ raise ValueError("Cannot create QueryResult from an empty
DataFrame")
+
+ headers = list(df.columns)
+ if headers[0] != "timestamp":
+ raise StreamPipesUnsupportedDataSeries(
+ f"First column must be 'timestamp', got {headers[0]!r}"
+ )
+
+ df["timestamp"] = df["timestamp"].astype("int64")
+ rows = df.values.tolist()
+ data_series = DataSeries(headers=headers, rows=rows, total=len(rows))
+
+ return cls(
+ total=len(rows),
+ headers=headers,
+ all_data_series=[data_series],
+ query_status=query_status,
+ source_index=source_index,
+ for_id=for_id,
+ last_timestamp=int(df["timestamp"].max()),
+ )