This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
     new d227e5c572 Add error messages
d227e5c572 is described below

commit d227e5c57217941cd1a5dd35c80805c55d77011f
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Aug 19 16:10:28 2025 +0200

    Add error messages
---
 .../streampipes/endpoint/api/data_lake_measure.py  | 34 +++++++++++-------
 .../rest/impl/datalake/DataLakeDataWriter.java     | 42 +++++++++++++++++++++-
 .../rest/impl/datalake/DataLakeResource.java       | 11 +++---
 3 files changed, 69 insertions(+), 18 deletions(-)

diff --git 
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py 
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 4ddd93abd1..0b53e8f1fc 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -23,6 +23,7 @@ from datetime import datetime
 from json import dumps
 from typing import Any, Dict, List, Literal, Optional, Tuple, Type
 
+from pandas import DataFrame
 from pydantic.v1 import BaseModel, Extra, Field, StrictInt, ValidationError, 
validator
 
 from streampipes.endpoint.endpoint import APIEndpoint
@@ -380,19 +381,23 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         response = 
self._make_request(request_method=self._parent_client.request_session.get, 
url=url)
         return self._resource_cls(**response.json())
 
-    def storeDataToMeasurement(self, identifier: str, query_result: 
QueryResult) -> None:
-        """Stores data into the specified data lake measurement.
+    def storeDataToMeasurement(self, identifier: str, df: DataFrame, 
ignore_schema_mismatch=False) -> None:
+        """Stores data from a pandas DataFrame into the specified data lake 
measurement.
 
-        This method sends the provided `QueryResult` as JSON to the 
StreamPipes Data Lake
-        and appends it to the measurement identified by `identifier`.
+        The provided DataFrame will be converted into a `QueryResult` using
+        `QueryResult.from_pandas` and then serialized to JSON before being sent
+        to the StreamPipes Data Lake. The data will be appended to the 
measurement
+        identified by `identifier`.
 
         Parameters
         ----------
         identifier : str
             The identifier of the data lake measurement into which the data 
will be stored.
-        query_result : QueryResult
-            The data to be stored, provided as a QueryResult object. It will 
be serialized
-            to JSON using its `to_dict()` representation.
+        df : pandas.DataFrame
+            The data to be stored, provided as a pandas DataFrame. The first 
column
+            must be `timestamp` and all timestamp values will be cast to 
integers.
+        ignore_schema_mismatch: bool
+            Defines if mismatching events should be stored.
 
         Returns
         -------
@@ -401,16 +406,19 @@ class DataLakeMeasureEndpoint(APIEndpoint):
 
         Examples
         --------
-        >>> df = pd.DataFrame({
-        ...     "timestamp": [1672531200000, 1672531260000],
-        ...     "value": [42, 43],
-        ... })
-        >>> query_result = QueryResult.from_pandas(df)
-        >>> 
client.dataLakeMeasureEndpoint.storeDataToMeasurement("my-measure-id", 
query_result)
+        ```python
+        df = pd.DataFrame({
+            "timestamp": [1672531200000, 1672531260000],
+            "value": [42, 43],
+        })
+        client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
+        ```
         """
+        query_result = QueryResult.from_pandas(df)
         self._make_request(
             request_method=self._parent_client.request_session.post,
             url=f"{self.build_url()}/{identifier}",
+            params={"ignoreSchemaMismatch": ignore_schema_mismatch},
             data=dumps(query_result.to_dict(use_source_names=True)),
             headers={"Content-type": "application/json"},
         )
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
index c4faeee687..5004014805 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -29,6 +29,8 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -36,8 +38,17 @@ import java.util.stream.IntStream;
 
 public class DataLakeDataWriter {
 
+  private final boolean ignoreSchemaMismatch;
+
+  public DataLakeDataWriter(boolean ignoreSchemaMismatch) {
+    this.ignoreSchemaMismatch = ignoreSchemaMismatch;
+  }
+
   public void writeData(String measureName, SpQueryResult queryResult) {
     var measure = 
CouchDbStorageManager.INSTANCE.getDataLakeStorage().getByMeasureName(measureName);
+    if (measure == null) {
+      throw new SpRuntimeException("Measure \"" + measureName + "\" not 
found");
+    }
     var dataSeries = getDataSeries(queryResult);
     getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
   }
@@ -45,10 +56,16 @@ public class DataLakeDataWriter {
   private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
                                                        DataLakeMeasure 
measure){
     var timeSeriesStore = getTimeSeriesStore(measure);
+    var runtimeNames = getRuntimeNames(measure);
     for (var row : dataSeries.getRows()) {
       var event = rowToEvent(row, dataSeries.getHeaders());
       renameTimestampField(event, measure.getTimestampField());
-      timeSeriesStore.onEvent(event);
+      checkRuntimeNames(runtimeNames, event);
+      try {
+        timeSeriesStore.onEvent(event);
+      } catch (IllegalArgumentException e) {
+        throw new SpRuntimeException("Fields don't match for event: " + 
event.getRaw());
+      }
     }
     timeSeriesStore.close();
   }
@@ -71,6 +88,29 @@ public class DataLakeDataWriter {
     }
   }
 
+  private void checkRuntimeNames(List<String> runtimeNames, Event event) {
+    if (!ignoreSchemaMismatch) {
+      var strippedEventKeys = event.getFields().keySet().stream()
+          .map(this::getSubstringAfterColons)
+          .collect(Collectors.toSet());
+      var runtimeNameSet = new HashSet<>(runtimeNames);
+
+      if (!runtimeNameSet.equals(strippedEventKeys)){
+        throw new SpRuntimeException("The fields of the event do not match. 
Use \"ignoreSchemaMismatch\" to "
+            + "ignore this error. Fields of the event: " + strippedEventKeys);
+      }
+    }
+  }
+
+  private List<String> getRuntimeNames(DataLakeMeasure measure) {
+    var runtimeNames = new ArrayList<String>();
+    runtimeNames.add(measure.getTimestampFieldName());
+    for (var eventProperties: measure.getEventSchema().getEventProperties()) {
+      runtimeNames.add(eventProperties.getRuntimeName());
+    }
+    return runtimeNames;
+  }
+
   private String getSubstringAfterColons(String input) {
     int index = input.indexOf("::");
     if (index != -1) {
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index e63e1c5789..577e12fb78 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -379,14 +379,17 @@ public class DataLakeResource extends 
AbstractRestResource {
           @ApiResponse(
               responseCode = "200",
               description = "Successfully stored data")})
-  public ResponseEntity<?> storeDataToMeasurement(@PathVariable String 
measurementID,
-                                                  @RequestBody SpQueryResult 
queryResult) {
-    var dataWriter = new DataLakeDataWriter();
+  public ResponseEntity<?> storeDataToMeasurement(
+      @PathVariable String measurementID,
+      @RequestBody SpQueryResult queryResult,
+      @Parameter(in = ParameterIn.QUERY, description = "should not identical 
schemas be stored")
+      @RequestParam(value = "ignoreSchemaMismatch", required = false) boolean 
ignoreSchemaMismatch) {
+    var dataWriter = new DataLakeDataWriter(ignoreSchemaMismatch);
     try {
       dataWriter.writeData(measurementID, queryResult);
     } catch (SpRuntimeException e) {
       LOG.warn("Could not store event", e);
-      return badRequest(Notifications.error("Could not store event for 
measurement " + measurementID));
+      return badRequest(Notifications.error("Could not store event for 
measurement " + measurementID, e.getMessage()));
     }
     return ok();
   }

Reply via email to