This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new b065306789 feat(#3733): Create API endpoint to upload time series data 
(#3746)
b065306789 is described below

commit b065306789e72ddc055d11451d74caefb8356646
Author: Sven Oehler <[email protected]>
AuthorDate: Mon Aug 25 14:07:56 2025 +0200

    feat(#3733): Create API endpoint to upload time series data (#3746)
    
    * Add endpoint to store measurements
    
    * Adjust python client
    
    * Change endpoint id to measure name
    
    * Add python client documentation
    
    * Add error messages
    
    * Add chunking when sending data
    
    * Fix checkstyle
    
    * Fix header
---
 .../streampipes/endpoint/api/data_lake_measure.py  |  64 +++++++++-
 .../streampipes/model/resource/query_result.py     |  49 ++++++++
 .../rest/impl/datalake/DataLakeDataWriter.java     | 136 +++++++++++++++++++++
 .../rest/impl/datalake/DataLakeResource.java       |  31 +++++
 4 files changed, 276 insertions(+), 4 deletions(-)

diff --git 
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py 
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 1ef107b1e7..629708a680 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -20,8 +20,11 @@ Specific implementation of the StreamPipes API's data lake 
measure endpoints.
 This endpoint allows to consume data stored in StreamPipes' data lake.
 """
 from datetime import datetime
+from json import dumps
+from math import ceil
 from typing import Any, Dict, List, Literal, Optional, Tuple, Type
 
+from pandas import DataFrame
 from pydantic.v1 import BaseModel, Extra, Field, StrictInt, ValidationError, 
validator
 
 from streampipes.endpoint.endpoint import APIEndpoint
@@ -199,7 +202,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
     Consequently, it allows querying metadata about available data sets (see 
`all()` method).
     The metadata is returned as an instance of 
[`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures].
 
-    In addition, the endpoint provides direct access to the data stored in the 
data laka by querying a
+    In addition, the endpoint provides direct access to the data stored in the 
data lake by querying a
     specific data lake measure using the `get()` method.
 
     Examples
@@ -253,7 +256,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
     ```
 
     As you can see, the returned amount of rows per default is `1000`.
-    We can modify this behavior by passing the `limit` paramter.
+    We can modify this behavior by passing the `limit` parameter.
     ```python
     flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate", 
limit=10).to_pandas()
     len(flow_rate_pd)
@@ -347,7 +350,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         """Queries the specified data lake measure from the API.
 
         By default, the maximum number of returned records is 1000.
-        This behaviour can be influenced by passing the parameter `limit` with 
a different value
+        This behavior can be influenced by passing the parameter `limit` with 
a different value
         (see 
[MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
 
         Parameters
@@ -369,7 +372,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         see directly at 
[DataLakeMeasureEndpoint][streampipes.endpoint.api.data_lake_measure.DataLakeMeasureEndpoint].
         """
 
-        # bild base URL for resource
+        # build base URL for resource
         url = f"{self.build_url()}/{identifier}"
 
         # extend base URL by query parameters
@@ -378,3 +381,56 @@ class DataLakeMeasureEndpoint(APIEndpoint):
 
         response = 
self._make_request(request_method=self._parent_client.request_session.get, 
url=url)
         return self._resource_cls(**response.json())
+
+    def storeDataToMeasurement(
+        self, identifier: str, df: DataFrame, ignore_schema_mismatch=False, 
batch_size: int = 10000
+    ) -> None:
+        """Stores data from a pandas DataFrame into the specified data lake 
measurement.
+
+        The provided DataFrame will be split into chunks and converted into a
+        `QueryResult` and then serialized to JSON before being sent to the
+        StreamPipes Data Lake. The data will be appended to the measurement
+        identified by `identifier`.
+
+        Parameters
+        ----------
+        identifier : str
+            The identifier of the data lake measurement into which the data 
will be stored.
+        df : pandas.DataFrame
+            The data to be stored, provided as a pandas DataFrame. The first 
column
+            must be `timestamp` and all timestamp values will be cast to 
integers.
+        ignore_schema_mismatch: bool
+            Defines if mismatching events should be stored.
+        batch_size:
+            The size of the chunks in which the data gets split. This ensures
+            that requests remain reasonably small
+
+        Returns
+        -------
+        None
+            This method does not return anything.
+
+        Examples
+        --------
+        ```python
+        df = pd.DataFrame({
+            "timestamp": [1672531200000, 1672531260000],
+            "value": [42, 43],
+        })
+        client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
+        ```
+        """
+
+        num_chunks = ceil(len(df) / batch_size)
+        for i in range(num_chunks):
+            start = i * batch_size
+            end = (i + 1) * batch_size
+            chunk = df.iloc[start:end].copy()
+            query_result = QueryResult.from_pandas(chunk)
+            self._make_request(
+                request_method=self._parent_client.request_session.post,
+                url=f"{self.build_url()}/{identifier}",
+                params={"ignoreSchemaMismatch": ignore_schema_mismatch},
+                data=dumps(query_result.to_dict(use_source_names=True)),
+                headers={"Content-type": "application/json"},
+            )
diff --git 
a/streampipes-client-python/streampipes/model/resource/query_result.py 
b/streampipes-client-python/streampipes/model/resource/query_result.py
index fbd613e9ce..1b251e0017 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -91,3 +91,52 @@ class QueryResult(Resource):
         df = pd.DataFrame(data=pandas_representation["rows"], 
columns=pandas_representation["headers"])
 
         return df
+
+    @classmethod
+    def from_pandas(
+        cls,
+        df: pd.DataFrame,
+        source_index: int = 0,
+        for_id: Optional[str] = None,
+        query_status: Literal["OK", "TOO_MUCH_DATA"] = "OK",
+    ) -> "QueryResult":
+        """Create a QueryResult object from a pandas DataFrame.
+
+        Parameters
+        ----------
+        df : pd.DataFrame
+            The DataFrame to convert into a QueryResult.
+        source_index : int, optional
+            Source index of the query result (default is 0).
+        for_id : str, optional
+            Optional identifier for the query result.
+        query_status : Literal["OK", "TOO_MUCH_DATA"], optional
+            Query status to set in the QueryResult (default is "OK").
+
+        Returns
+        -------
+        QueryResult
+            A QueryResult object representing the DataFrame.
+        """
+        if df.empty:
+            raise ValueError("Cannot create QueryResult from an empty 
DataFrame")
+        if df.isna().any().any():
+            raise ValueError("Cannot create QueryResult from a DataFrame with 
NaN values")
+
+        headers = df.columns.to_list()
+        if headers[0] != "timestamp":
+            raise StreamPipesUnsupportedDataSeries(f"First column must be 
'timestamp', got {headers[0]!r}")
+
+        df["timestamp"] = df["timestamp"].astype("int64")
+        rows = df.values.tolist()
+        data_series = DataSeries(total=len(rows), headers=headers, rows=rows, 
tags=None)
+
+        return cls(
+            total=len(rows),
+            headers=headers,
+            all_data_series=[data_series],
+            spQueryStatus=query_status,
+            source_index=source_index,
+            for_id=for_id,
+            last_timestamp=int(df["timestamp"].max()),
+        )
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
new file mode 100644
index 0000000000..72ae5bd07d
--- /dev/null
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class DataLakeDataWriter {
+
+  private final boolean ignoreSchemaMismatch;
+
+  public DataLakeDataWriter(boolean ignoreSchemaMismatch) {
+    this.ignoreSchemaMismatch = ignoreSchemaMismatch;
+  }
+
+  public void writeData(String measureName, SpQueryResult queryResult) {
+    var measure = 
CouchDbStorageManager.INSTANCE.getDataLakeStorage().getByMeasureName(measureName);
+    if (measure == null) {
+      throw new SpRuntimeException("Measure \"" + measureName + "\" not 
found");
+    }
+    var dataSeries = getDataSeries(queryResult);
+    getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
+  }
+
+  private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
+                                                       DataLakeMeasure 
measure){
+    var timeSeriesStore = getTimeSeriesStore(measure);
+    var runtimeNames = getRuntimeNames(measure);
+    for (var row : dataSeries.getRows()) {
+      var event = rowToEvent(row, dataSeries.getHeaders());
+      renameTimestampField(event, measure.getTimestampField());
+      checkRuntimeNames(runtimeNames, event);
+      try {
+        timeSeriesStore.onEvent(event);
+      } catch (IllegalArgumentException e) {
+        throw new SpRuntimeException("Fields don't match for event: " + 
event.getRaw());
+      }
+    }
+    timeSeriesStore.close();
+  }
+
+  private TimeSeriesStore getTimeSeriesStore(DataLakeMeasure measure){
+    return new TimeSeriesStore(
+        new DataExplorerDispatcher().getDataExplorerManager()
+            .getTimeseriesStorage(measure, false),
+        measure,
+        Environments.getEnvironment(),
+        true
+    );
+  }
+
+  private DataSeries getDataSeries(SpQueryResult queryResult) {
+    if (queryResult.getAllDataSeries().size() == 1) {
+      return queryResult.getAllDataSeries().get(0);
+    } else {
+      throw new SpRuntimeException("SpQueryResult must contain exactly one 
data series");
+    }
+  }
+
+  private void checkRuntimeNames(List<String> runtimeNames, Event event) {
+    if (!ignoreSchemaMismatch) {
+      var strippedEventKeys = event.getFields().keySet().stream()
+          .map(this::getSubstringAfterColons)
+          .collect(Collectors.toSet());
+      var runtimeNameSet = new HashSet<>(runtimeNames);
+
+      if (!runtimeNameSet.equals(strippedEventKeys)){
+        throw new SpRuntimeException("The fields of the event do not match. 
Use \"ignoreSchemaMismatch\" to "
+            + "ignore this error. Fields of the event: " + strippedEventKeys);
+      }
+    }
+  }
+
+  private List<String> getRuntimeNames(DataLakeMeasure measure) {
+    var runtimeNames = new ArrayList<String>();
+    runtimeNames.add(measure.getTimestampFieldName());
+    for (var eventProperties: measure.getEventSchema().getEventProperties()) {
+      runtimeNames.add(eventProperties.getRuntimeName());
+    }
+    return runtimeNames;
+  }
+
+  private String getSubstringAfterColons(String input) {
+    int index = input.indexOf("::");
+    if (index != -1) {
+      return input.substring(index + 2);
+    }
+    return input;
+  }
+
+  private Event rowToEvent(List<Object> row, List<String> headers){
+    Map<String, Object> eventMap = IntStream.range(0, headers.size())
+        .boxed()
+        .collect(Collectors.toMap(headers::get, row::get));
+    return EventFactory.fromMap(eventMap);
+  }
+
+  private void renameTimestampField(Event event, String timestampField){
+    var strippedTime = getSubstringAfterColons(timestampField);
+    event.addField(timestampField, 
event.getFieldByRuntimeName(strippedTime).getRawValue());
+  }
+
+}
+
+
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index c757a61688..296653909c 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.rest.impl.datalake;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.export.OutputFormat;
@@ -38,6 +39,8 @@ import io.swagger.v3.oas.annotations.media.ArraySchema;
 import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -84,6 +87,7 @@ import static 
org.apache.streampipes.model.datalake.param.SupportedRestQueryPara
 @RequestMapping("/api/v4/datalake")
 public class DataLakeResource extends AbstractRestResource {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeResource.class);
   private final IDataExplorerQueryManagement dataExplorerQueryManagement;
   private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
 
@@ -356,6 +360,33 @@ public class DataLakeResource extends AbstractRestResource 
{
     }
   }
 
+  @PostMapping(
+      path = "/measurements/{measurementID}",
+      produces = MediaType.APPLICATION_JSON_VALUE,
+      consumes = MediaType.APPLICATION_JSON_VALUE)
+  @Operation(summary = "Store a measurement series to a data lake with the 
given id", tags = {"Data Lake"},
+      responses = {
+          @ApiResponse(
+              responseCode = "400",
+              description = "Can't store the given data to this data lake"),
+          @ApiResponse(
+              responseCode = "200",
+              description = "Successfully stored data")})
+  public ResponseEntity<?> storeDataToMeasurement(
+      @PathVariable String measurementID,
+      @RequestBody SpQueryResult queryResult,
+      @Parameter(in = ParameterIn.QUERY, description = "should not identical 
schemas be stored")
+      @RequestParam(value = "ignoreSchemaMismatch", required = false) boolean 
ignoreSchemaMismatch) {
+    var dataWriter = new DataLakeDataWriter(ignoreSchemaMismatch);
+    try {
+      dataWriter.writeData(measurementID, queryResult);
+    } catch (SpRuntimeException e) {
+      LOG.warn("Could not store event", e);
+      return badRequest(Notifications.error("Could not store event for 
measurement " + measurementID, e.getMessage()));
+    }
+    return ok();
+  }
+
   @DeleteMapping(path = "/measurements")
   @Operation(summary = "Remove all stored measurement series from Data Lake", 
tags = {"Data Lake"},
       responses = {

Reply via email to