This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new b065306789 feat(#3733): Create API endpoint to upload time series data
(#3746)
b065306789 is described below
commit b065306789e72ddc055d11451d74caefb8356646
Author: Sven Oehler <[email protected]>
AuthorDate: Mon Aug 25 14:07:56 2025 +0200
feat(#3733): Create API endpoint to upload time series data (#3746)
* Add endpoint to store measurements
* Adjust python client
* Change endpoint id to measure name
* Add python client documentation
* Add error messages
* Add chunking when sending data
* Fix checkstyle
* Fix header
---
.../streampipes/endpoint/api/data_lake_measure.py | 64 +++++++++-
.../streampipes/model/resource/query_result.py | 49 ++++++++
.../rest/impl/datalake/DataLakeDataWriter.java | 136 +++++++++++++++++++++
.../rest/impl/datalake/DataLakeResource.java | 31 +++++
4 files changed, 276 insertions(+), 4 deletions(-)
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 1ef107b1e7..629708a680 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -20,8 +20,11 @@ Specific implementation of the StreamPipes API's data lake
measure endpoints.
This endpoint allows to consume data stored in StreamPipes' data lake.
"""
from datetime import datetime
+from json import dumps
+from math import ceil
from typing import Any, Dict, List, Literal, Optional, Tuple, Type
+from pandas import DataFrame
from pydantic.v1 import BaseModel, Extra, Field, StrictInt, ValidationError,
validator
from streampipes.endpoint.endpoint import APIEndpoint
@@ -199,7 +202,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
Consequently, it allows querying metadata about available data sets (see
`all()` method).
The metadata is returned as an instance of
[`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures].
- In addition, the endpoint provides direct access to the data stored in the
data laka by querying a
+ In addition, the endpoint provides direct access to the data stored in the
data lake by querying a
specific data lake measure using the `get()` method.
Examples
@@ -253,7 +256,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
```
As you can see, the returned amount of rows per default is `1000`.
- We can modify this behavior by passing the `limit` paramter.
+ We can modify this behavior by passing the `limit` parameter.
```python
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate",
limit=10).to_pandas()
len(flow_rate_pd)
@@ -347,7 +350,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
"""Queries the specified data lake measure from the API.
By default, the maximum number of returned records is 1000.
- This behaviour can be influenced by passing the parameter `limit` with
a different value
+ This behavior can be influenced by passing the parameter `limit` with
a different value
(see
[MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
Parameters
@@ -369,7 +372,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
see directly at
[DataLakeMeasureEndpoint][streampipes.endpoint.api.data_lake_measure.DataLakeMeasureEndpoint].
"""
- # bild base URL for resource
+ # build base URL for resource
url = f"{self.build_url()}/{identifier}"
# extend base URL by query parameters
@@ -378,3 +381,56 @@ class DataLakeMeasureEndpoint(APIEndpoint):
response =
self._make_request(request_method=self._parent_client.request_session.get,
url=url)
return self._resource_cls(**response.json())
+
+ def storeDataToMeasurement(
+ self, identifier: str, df: DataFrame, ignore_schema_mismatch=False,
batch_size: int = 10000
+ ) -> None:
+ """Stores data from a pandas DataFrame into the specified data lake
measurement.
+
+ The provided DataFrame will be split into chunks and converted into a
+ `QueryResult` and then serialized to JSON before being sent to the
+ StreamPipes Data Lake. The data will be appended to the measurement
+ identified by `identifier`.
+
+ Parameters
+ ----------
+ identifier : str
+ The identifier of the data lake measurement into which the data
will be stored.
+ df : pandas.DataFrame
+ The data to be stored, provided as a pandas DataFrame. The first
column
+ must be `timestamp` and all timestamp values will be cast to
integers.
+ ignore_schema_mismatch: bool
+ Defines if mismatching events should be stored.
+ batch_size:
+ The size of the chunks in which the data gets split. This ensures
+ that requests remain reasonably small
+
+ Returns
+ -------
+ None
+ This method does not return anything.
+
+ Examples
+ --------
+ ```python
+ df = pd.DataFrame({
+ "timestamp": [1672531200000, 1672531260000],
+ "value": [42, 43],
+ })
+ client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
+ ```
+ """
+
+ num_chunks = ceil(len(df) / batch_size)
+ for i in range(num_chunks):
+ start = i * batch_size
+ end = (i + 1) * batch_size
+ chunk = df.iloc[start:end].copy()
+ query_result = QueryResult.from_pandas(chunk)
+ self._make_request(
+ request_method=self._parent_client.request_session.post,
+ url=f"{self.build_url()}/{identifier}",
+ params={"ignoreSchemaMismatch": ignore_schema_mismatch},
+ data=dumps(query_result.to_dict(use_source_names=True)),
+ headers={"Content-type": "application/json"},
+ )
diff --git
a/streampipes-client-python/streampipes/model/resource/query_result.py
b/streampipes-client-python/streampipes/model/resource/query_result.py
index fbd613e9ce..1b251e0017 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -91,3 +91,52 @@ class QueryResult(Resource):
df = pd.DataFrame(data=pandas_representation["rows"],
columns=pandas_representation["headers"])
return df
+
+ @classmethod
+ def from_pandas(
+ cls,
+ df: pd.DataFrame,
+ source_index: int = 0,
+ for_id: Optional[str] = None,
+ query_status: Literal["OK", "TOO_MUCH_DATA"] = "OK",
+ ) -> "QueryResult":
+ """Create a QueryResult object from a pandas DataFrame.
+
+ Parameters
+ ----------
+ df : pd.DataFrame
+ The DataFrame to convert into a QueryResult.
+ source_index : int, optional
+ Source index of the query result (default is 0).
+ for_id : str, optional
+ Optional identifier for the query result.
+ query_status : Literal["OK", "TOO_MUCH_DATA"], optional
+ Query status to set in the QueryResult (default is "OK").
+
+ Returns
+ -------
+ QueryResult
+ A QueryResult object representing the DataFrame.
+ """
+ if df.empty:
+ raise ValueError("Cannot create QueryResult from an empty
DataFrame")
+ if df.isna().any().any():
+ raise ValueError("Cannot create QueryResult from a DataFrame with
NaN values")
+
+ headers = df.columns.to_list()
+ if headers[0] != "timestamp":
+ raise StreamPipesUnsupportedDataSeries(f"First column must be
'timestamp', got {headers[0]!r}")
+
+ df["timestamp"] = df["timestamp"].astype("int64")
+ rows = df.values.tolist()
+ data_series = DataSeries(total=len(rows), headers=headers, rows=rows,
tags=None)
+
+ return cls(
+ total=len(rows),
+ headers=headers,
+ all_data_series=[data_series],
+ spQueryStatus=query_status,
+ source_index=source_index,
+ for_id=for_id,
+ last_timestamp=int(df["timestamp"].max()),
+ )
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
new file mode 100644
index 0000000000..72ae5bd07d
--- /dev/null
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class DataLakeDataWriter {
+
+ private final boolean ignoreSchemaMismatch;
+
+ public DataLakeDataWriter(boolean ignoreSchemaMismatch) {
+ this.ignoreSchemaMismatch = ignoreSchemaMismatch;
+ }
+
+ public void writeData(String measureName, SpQueryResult queryResult) {
+ var measure =
CouchDbStorageManager.INSTANCE.getDataLakeStorage().getByMeasureName(measureName);
+ if (measure == null) {
+ throw new SpRuntimeException("Measure \"" + measureName + "\" not
found");
+ }
+ var dataSeries = getDataSeries(queryResult);
+ getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
+ }
+
+ private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
+ DataLakeMeasure
measure){
+ var timeSeriesStore = getTimeSeriesStore(measure);
+ var runtimeNames = getRuntimeNames(measure);
+ for (var row : dataSeries.getRows()) {
+ var event = rowToEvent(row, dataSeries.getHeaders());
+ renameTimestampField(event, measure.getTimestampField());
+ checkRuntimeNames(runtimeNames, event);
+ try {
+ timeSeriesStore.onEvent(event);
+ } catch (IllegalArgumentException e) {
+ throw new SpRuntimeException("Fields don't match for event: " +
event.getRaw());
+ }
+ }
+ timeSeriesStore.close();
+ }
+
+ private TimeSeriesStore getTimeSeriesStore(DataLakeMeasure measure){
+ return new TimeSeriesStore(
+ new DataExplorerDispatcher().getDataExplorerManager()
+ .getTimeseriesStorage(measure, false),
+ measure,
+ Environments.getEnvironment(),
+ true
+ );
+ }
+
+ private DataSeries getDataSeries(SpQueryResult queryResult) {
+ if (queryResult.getAllDataSeries().size() == 1) {
+ return queryResult.getAllDataSeries().get(0);
+ } else {
+ throw new SpRuntimeException("SpQueryResult must contain exactly one
data series");
+ }
+ }
+
+ private void checkRuntimeNames(List<String> runtimeNames, Event event) {
+ if (!ignoreSchemaMismatch) {
+ var strippedEventKeys = event.getFields().keySet().stream()
+ .map(this::getSubstringAfterColons)
+ .collect(Collectors.toSet());
+ var runtimeNameSet = new HashSet<>(runtimeNames);
+
+ if (!runtimeNameSet.equals(strippedEventKeys)){
+ throw new SpRuntimeException("The fields of the event do not match.
Use \"ignoreSchemaMismatch\" to "
+ + "ignore this error. Fields of the event: " + strippedEventKeys);
+ }
+ }
+ }
+
+ private List<String> getRuntimeNames(DataLakeMeasure measure) {
+ var runtimeNames = new ArrayList<String>();
+ runtimeNames.add(measure.getTimestampFieldName());
+ for (var eventProperties: measure.getEventSchema().getEventProperties()) {
+ runtimeNames.add(eventProperties.getRuntimeName());
+ }
+ return runtimeNames;
+ }
+
+ private String getSubstringAfterColons(String input) {
+ int index = input.indexOf("::");
+ if (index != -1) {
+ return input.substring(index + 2);
+ }
+ return input;
+ }
+
+ private Event rowToEvent(List<Object> row, List<String> headers){
+ Map<String, Object> eventMap = IntStream.range(0, headers.size())
+ .boxed()
+ .collect(Collectors.toMap(headers::get, row::get));
+ return EventFactory.fromMap(eventMap);
+ }
+
+ private void renameTimestampField(Event event, String timestampField){
+ var strippedTime = getSubstringAfterColons(timestampField);
+ event.addField(timestampField,
event.getFieldByRuntimeName(strippedTime).getRawValue());
+ }
+
+}
+
+
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index c757a61688..296653909c 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.rest.impl.datalake;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.export.OutputFormat;
@@ -38,6 +39,8 @@ import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -84,6 +87,7 @@ import static
org.apache.streampipes.model.datalake.param.SupportedRestQueryPara
@RequestMapping("/api/v4/datalake")
public class DataLakeResource extends AbstractRestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeResource.class);
private final IDataExplorerQueryManagement dataExplorerQueryManagement;
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
@@ -356,6 +360,33 @@ public class DataLakeResource extends AbstractRestResource
{
}
}
+ @PostMapping(
+ path = "/measurements/{measurementID}",
+ produces = MediaType.APPLICATION_JSON_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Store a measurement series to a data lake with the
given id", tags = {"Data Lake"},
+ responses = {
+ @ApiResponse(
+ responseCode = "400",
+ description = "Can't store the given data to this data lake"),
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successfully stored data")})
+ public ResponseEntity<?> storeDataToMeasurement(
+ @PathVariable String measurementID,
+ @RequestBody SpQueryResult queryResult,
+ @Parameter(in = ParameterIn.QUERY, description = "should not identical
schemas be stored")
+ @RequestParam(value = "ignoreSchemaMismatch", required = false) boolean
ignoreSchemaMismatch) {
+ var dataWriter = new DataLakeDataWriter(ignoreSchemaMismatch);
+ try {
+ dataWriter.writeData(measurementID, queryResult);
+ } catch (SpRuntimeException e) {
+ LOG.warn("Could not store event", e);
+ return badRequest(Notifications.error("Could not store event for
measurement " + measurementID, e.getMessage()));
+ }
+ return ok();
+ }
+
@DeleteMapping(path = "/measurements")
@Operation(summary = "Remove all stored measurement series from Data Lake",
tags = {"Data Lake"},
responses = {