This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
new d227e5c572 Add error messages
d227e5c572 is described below
commit d227e5c57217941cd1a5dd35c80805c55d77011f
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Aug 19 16:10:28 2025 +0200
Add error messages
---
.../streampipes/endpoint/api/data_lake_measure.py | 34 +++++++++++-------
.../rest/impl/datalake/DataLakeDataWriter.java | 42 +++++++++++++++++++++-
.../rest/impl/datalake/DataLakeResource.java | 11 +++---
3 files changed, 69 insertions(+), 18 deletions(-)
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 4ddd93abd1..0b53e8f1fc 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -23,6 +23,7 @@ from datetime import datetime
from json import dumps
from typing import Any, Dict, List, Literal, Optional, Tuple, Type
+from pandas import DataFrame
from pydantic.v1 import BaseModel, Extra, Field, StrictInt, ValidationError,
validator
from streampipes.endpoint.endpoint import APIEndpoint
@@ -380,19 +381,23 @@ class DataLakeMeasureEndpoint(APIEndpoint):
response =
self._make_request(request_method=self._parent_client.request_session.get,
url=url)
return self._resource_cls(**response.json())
- def storeDataToMeasurement(self, identifier: str, query_result:
QueryResult) -> None:
- """Stores data into the specified data lake measurement.
+ def storeDataToMeasurement(self, identifier: str, df: DataFrame,
ignore_schema_mismatch=False) -> None:
+ """Stores data from a pandas DataFrame into the specified data lake
measurement.
- This method sends the provided `QueryResult` as JSON to the
StreamPipes Data Lake
- and appends it to the measurement identified by `identifier`.
+ The provided DataFrame will be converted into a `QueryResult` using
+ `QueryResult.from_pandas` and then serialized to JSON before being sent
+ to the StreamPipes Data Lake. The data will be appended to the
measurement
+ identified by `identifier`.
Parameters
----------
identifier : str
The identifier of the data lake measurement into which the data
will be stored.
- query_result : QueryResult
- The data to be stored, provided as a QueryResult object. It will
be serialized
- to JSON using its `to_dict()` representation.
+ df : pandas.DataFrame
+ The data to be stored, provided as a pandas DataFrame. The first
column
+ must be `timestamp` and all timestamp values will be cast to
integers.
+ ignore_schema_mismatch: bool
+ Defines if mismatching events should be stored.
Returns
-------
@@ -401,16 +406,19 @@ class DataLakeMeasureEndpoint(APIEndpoint):
Examples
--------
- >>> df = pd.DataFrame({
- ... "timestamp": [1672531200000, 1672531260000],
- ... "value": [42, 43],
- ... })
- >>> query_result = QueryResult.from_pandas(df)
- >>>
client.dataLakeMeasureEndpoint.storeDataToMeasurement("my-measure-id",
query_result)
+ ```python
+ df = pd.DataFrame({
+ "timestamp": [1672531200000, 1672531260000],
+ "value": [42, 43],
+ })
+ client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
+ ```
"""
+ query_result = QueryResult.from_pandas(df)
self._make_request(
request_method=self._parent_client.request_session.post,
url=f"{self.build_url()}/{identifier}",
+ params={"ignoreSchemaMismatch": ignore_schema_mismatch},
data=dumps(query_result.to_dict(use_source_names=True)),
headers={"Content-type": "application/json"},
)
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
index c4faeee687..5004014805 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -29,6 +29,8 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -36,8 +38,17 @@ import java.util.stream.IntStream;
public class DataLakeDataWriter {
+ private final boolean ignoreSchemaMismatch;
+
+ public DataLakeDataWriter(boolean ignoreSchemaMismatch) {
+ this.ignoreSchemaMismatch = ignoreSchemaMismatch;
+ }
+
public void writeData(String measureName, SpQueryResult queryResult) {
var measure =
CouchDbStorageManager.INSTANCE.getDataLakeStorage().getByMeasureName(measureName);
+ if (measure == null) {
+ throw new SpRuntimeException("Measure \"" + measureName + "\" not
found");
+ }
var dataSeries = getDataSeries(queryResult);
getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
}
@@ -45,10 +56,16 @@ public class DataLakeDataWriter {
private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
DataLakeMeasure
measure){
var timeSeriesStore = getTimeSeriesStore(measure);
+ var runtimeNames = getRuntimeNames(measure);
for (var row : dataSeries.getRows()) {
var event = rowToEvent(row, dataSeries.getHeaders());
renameTimestampField(event, measure.getTimestampField());
- timeSeriesStore.onEvent(event);
+ checkRuntimeNames(runtimeNames, event);
+ try {
+ timeSeriesStore.onEvent(event);
+ } catch (IllegalArgumentException e) {
+ throw new SpRuntimeException("Fields don't match for event: " +
event.getRaw());
+ }
}
timeSeriesStore.close();
}
@@ -71,6 +88,29 @@ public class DataLakeDataWriter {
}
}
+ private void checkRuntimeNames(List<String> runtimeNames, Event event) {
+ if (!ignoreSchemaMismatch) {
+ var strippedEventKeys = event.getFields().keySet().stream()
+ .map(this::getSubstringAfterColons)
+ .collect(Collectors.toSet());
+ var runtimeNameSet = new HashSet<>(runtimeNames);
+
+ if (!runtimeNameSet.equals(strippedEventKeys)){
+ throw new SpRuntimeException("The fields of the event do not match.
Use \"ignoreSchemaMismatch\" to "
+ + "ignore this error. Fields of the event: " + strippedEventKeys);
+ }
+ }
+ }
+
+ private List<String> getRuntimeNames(DataLakeMeasure measure) {
+ var runtimeNames = new ArrayList<String>();
+ runtimeNames.add(measure.getTimestampFieldName());
+ for (var eventProperties: measure.getEventSchema().getEventProperties()) {
+ runtimeNames.add(eventProperties.getRuntimeName());
+ }
+ return runtimeNames;
+ }
+
private String getSubstringAfterColons(String input) {
int index = input.indexOf("::");
if (index != -1) {
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index e63e1c5789..577e12fb78 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -379,14 +379,17 @@ public class DataLakeResource extends
AbstractRestResource {
@ApiResponse(
responseCode = "200",
description = "Successfully stored data")})
- public ResponseEntity<?> storeDataToMeasurement(@PathVariable String
measurementID,
- @RequestBody SpQueryResult
queryResult) {
- var dataWriter = new DataLakeDataWriter();
+ public ResponseEntity<?> storeDataToMeasurement(
+ @PathVariable String measurementID,
+ @RequestBody SpQueryResult queryResult,
+ @Parameter(in = ParameterIn.QUERY, description = "should not identical
schemas be stored")
+ @RequestParam(value = "ignoreSchemaMismatch", required = false) boolean
ignoreSchemaMismatch) {
+ var dataWriter = new DataLakeDataWriter(ignoreSchemaMismatch);
try {
dataWriter.writeData(measurementID, queryResult);
} catch (SpRuntimeException e) {
LOG.warn("Could not store event", e);
- return badRequest(Notifications.error("Could not store event for
measurement " + measurementID));
+ return badRequest(Notifications.error("Could not store event for
measurement " + measurementID, e.getMessage()));
}
return ok();
}