This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
new 47b60dbcda Add python client documentation
47b60dbcda is described below
commit 47b60dbcdad08a92f83011aa9fa5631a97768ccd
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Aug 19 11:40:52 2025 +0200
Add python client documentation
---
.../streampipes/endpoint/api/data_lake_measure.py | 32 ++++++++++++++++++++--
.../streampipes/model/resource/query_result.py | 14 ++++------
2 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 2be1aa0dc1..4ddd93abd1 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -379,12 +379,38 @@ class DataLakeMeasureEndpoint(APIEndpoint):
response =
self._make_request(request_method=self._parent_client.request_session.get,
url=url)
return self._resource_cls(**response.json())
-
- def post(self, identifier: str, query_result: QueryResult) -> None:
+
+ def storeDataToMeasurement(self, identifier: str, query_result:
QueryResult) -> None:
+ """Stores data into the specified data lake measurement.
+
+ This method sends the provided `QueryResult` as JSON to the
StreamPipes Data Lake
+ and appends it to the measurement identified by `identifier`.
+
+ Parameters
+ ----------
+ identifier : str
+ The identifier of the data lake measurement into which the data
will be stored.
+ query_result : QueryResult
+ The data to be stored, provided as a QueryResult object. It will
be serialized
+ to JSON using its `to_dict()` representation.
+
+ Returns
+ -------
+ None
+ This method does not return anything.
+
+ Examples
+ --------
+ >>> df = pd.DataFrame({
+ ... "timestamp": [1672531200000, 1672531260000],
+ ... "value": [42, 43],
+ ... })
+ >>> query_result = QueryResult.from_pandas(df)
+ >>>
client.dataLakeMeasureEndpoint.storeDataToMeasurement("my-measure-id",
query_result)
+ """
self._make_request(
request_method=self._parent_client.request_session.post,
url=f"{self.build_url()}/{identifier}",
data=dumps(query_result.to_dict(use_source_names=True)),
headers={"Content-type": "application/json"},
)
-
diff --git
a/streampipes-client-python/streampipes/model/resource/query_result.py
b/streampipes-client-python/streampipes/model/resource/query_result.py
index 47a89aeee9..049992d182 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -91,7 +91,7 @@ class QueryResult(Resource):
df = pd.DataFrame(data=pandas_representation["rows"],
columns=pandas_representation["headers"])
return df
-
+
@classmethod
def from_pandas(
cls,
@@ -121,21 +121,19 @@ class QueryResult(Resource):
if df.empty:
raise ValueError("Cannot create QueryResult from an empty
DataFrame")
- headers = list(df.columns)
+ headers = df.columns.to_list()
if headers[0] != "timestamp":
- raise StreamPipesUnsupportedDataSeries(
- f"First column must be 'timestamp', got {headers[0]!r}"
- )
-
+ raise StreamPipesUnsupportedDataSeries(f"First column must be
'timestamp', got {headers[0]!r}")
+
df["timestamp"] = df["timestamp"].astype("int64")
rows = df.values.tolist()
- data_series = DataSeries(headers=headers, rows=rows, total=len(rows))
+ data_series = DataSeries(total=len(rows), headers=headers, rows=rows,
tags=None)
return cls(
total=len(rows),
headers=headers,
all_data_series=[data_series],
- query_status=query_status,
+ spQueryStatus=query_status,
source_index=source_index,
for_id=for_id,
last_timestamp=int(df["timestamp"].max()),