This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new b2dbf1c27e feat: write data to Datalake from Java Client (#4047)
b2dbf1c27e is described below
commit b2dbf1c27e3382265b76e149543616038be5261f
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Tue Dec 16 09:55:10 2025 +0100
feat: write data to Datalake from Java Client (#4047)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.../client/api/IDataLakeResourceApi.java | 24 ++++----
.../streampipes/client/api/IStreamPipesClient.java | 2 +
.../streampipes/client/StreamPipesClient.java | 6 ++
.../streampipes/client/api/AbstractClientApi.java | 6 +-
.../client/api/DataLakeResourceApi.java | 69 ++++++++++++++++++++++
.../client/serializer/ObjectSerializer.java | 8 +++
.../streampipes/client/serializer/Serializer.java | 10 +++-
.../client/serializer/SpQuerySerializer.java | 64 ++++++++++++++++++++
.../influx/DataExplorerQueryManagementInflux.java | 9 +--
.../rest/impl/connect/AdapterResource.java | 2 +-
.../rest/impl/datalake/DataLakeDataWriter.java | 3 +-
.../rest/impl/datalake/DataLakeResource.java | 3 +-
12 files changed, 183 insertions(+), 23 deletions(-)
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
similarity index 60%
copy from
streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
copy to
streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
index 5e1e99e5af..c9bb85e210 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
+++
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
@@ -15,21 +15,19 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.client.serializer;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+package org.apache.streampipes.client.api;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.model.datalake.SpQueryResult;
-public class ObjectSerializer<K, V> extends Serializer<K, V, V> {
+import java.util.Map;
+
+public interface IDataLakeResourceApi {
+
+ void delete(String measurementID, Long startDate, Long endDate);
+
+ void update(String measurementID, SpQueryResult queryResult, boolean
ignoreSchemaMismatch);
+
+ SpQueryResult get(String measurementID, Map<String, String> queryParams);
- @Override
- public V deserialize(String response, Class<V> targetClass) {
- try {
- return objectMapper.readValue(response, targetClass);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- throw new SpRuntimeException(e.fillInStackTrace());
- }
- }
}
diff --git
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
index ac8df164ca..44f5064431 100644
---
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
+++
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
@@ -59,4 +59,6 @@ public interface IStreamPipesClient extends Serializable {
void deliverEmail(SpEmail email);
IFileApi fileApi();
+
+ IDataLakeResourceApi dataLakeResourceApi();
}
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index b9c6543760..d159809b1e 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.client.api.AdapterApi;
import org.apache.streampipes.client.api.AdminApi;
import org.apache.streampipes.client.api.CustomRequestApi;
import org.apache.streampipes.client.api.DataLakeMeasureApi;
+import org.apache.streampipes.client.api.DataLakeResourceApi;
import org.apache.streampipes.client.api.DataProcessorApi;
import org.apache.streampipes.client.api.DataSinkApi;
import org.apache.streampipes.client.api.DataStreamApi;
@@ -225,4 +226,9 @@ public class StreamPipesClient implements
public FileApi fileApi() {
return new FileApi(config);
}
+
+ @Override
+ public DataLakeResourceApi dataLakeResourceApi () {
+ return new DataLakeResourceApi (config);
+ }
}
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 17be13e40f..9e643d4daf 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -56,10 +56,12 @@ public class AbstractClientApi {
}
protected <T> void post(StreamPipesApiPath apiPath, T object) {
- ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
- new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer,
object).executeRequest();
+
+ ObjectSerializer <T,Void> serializer = new ObjectSerializer<>(true);
+ new PostRequestWithoutPayloadResponse(clientConfig, apiPath, serializer,
object).executeRequest();
}
+
protected <T> void put(StreamPipesApiPath apiPath, T object) {
ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
new PutRequest<>(clientConfig, apiPath, serializer,
object).executeRequest();
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
new file mode 100644
index 0000000000..726eb5d313
--- /dev/null
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataLakeResourceApi extends AbstractClientApi implements
IDataLakeResourceApi {
+
+ public DataLakeResourceApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig);
+ }
+
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromStreamPipesBasePath()
+ .addToPath("api")
+ .addToPath("v4")
+ .addToPath("datalake")
+ .addToPath("measurements");
+ }
+
+ @Override
+ public void delete(String measurementID, Long startDate, Long endDate) {
+
+ Map<String, String> queryParams = new HashMap<>();
+ if (startDate != null) {
+ queryParams.put("startDate", startDate.toString());
+ }
+ if (endDate != null) {
+ queryParams.put("endDate", endDate.toString());
+ }
+
delete(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
Void.class);
+
+ }
+
+ @Override
+ public void update(String measurementID, SpQueryResult queryResult, boolean
ignoreSchemaMismatch) {
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put("ignoreSchemaMismatch",
String.valueOf(ignoreSchemaMismatch));
+
post(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
queryResult);
+ }
+
+ @Override
+ public SpQueryResult get(String measurementID, Map<String, String>
queryParams) {
+ return
getSingle(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
+ SpQueryResult.class);
+ }
+
+}
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
index 5e1e99e5af..ba39d0792d 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
@@ -23,6 +23,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
public class ObjectSerializer<K, V> extends Serializer<K, V, V> {
+ public ObjectSerializer() {
+ super();
+ }
+
+ public ObjectSerializer(boolean useDefaultTyping) {
+ super(useDefaultTyping);
+ }
+
@Override
public V deserialize(String response, Class<V> targetClass) {
try {
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
index 435d76a59b..5ed6cac869 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
@@ -28,7 +28,15 @@ public abstract class Serializer<K, V, T> {
protected ObjectMapper objectMapper;
public Serializer() {
- this.objectMapper = JacksonSerializer.getObjectMapper();
+ this(false);
+ }
+
+ public Serializer(boolean useDefaultTyping) {
+ if (useDefaultTyping) {
+ this.objectMapper =
JacksonSerializer.getObjectMapper().deactivateDefaultTyping();
+ } else {
+ this.objectMapper = JacksonSerializer.getObjectMapper();
+ }
}
public String serialize(K object) {
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
new file mode 100644
index 0000000000..69bdf54563
--- /dev/null
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.serializer;
+
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class SpQuerySerializer {
+ public static SpQueryResult processEventDataWithTags(List<? extends
Map<String, ?>> events,
+ TreeMap<String, String> tags) throws IOException {
+
+ if (tags == null) {
+ tags = new TreeMap<>();
+ }
+
+ List<String> headers = new ArrayList<>(events.get(0).keySet());
+ List<List<Object>> rows = new ArrayList<>();
+
+ for (Map<String, ?> event : events) {
+ List<Object> row = new ArrayList<>();
+ for (String header : headers) {
+ row.add(event.get(header));
+ }
+ rows.add(row);
+ }
+
+ DataSeries series = new DataSeries(events.size(), rows, headers, new
HashMap<>());
+
+ SpQueryResult queryResult = new SpQueryResult(events.size(), headers,
Collections.singletonList(series));
+ List<DataSeries> resultSeries = new ArrayList<>();
+
+ for (DataSeries s : queryResult.getAllDataSeries()) {
+ s.setTags(tags);
+ resultSeries.add(s);
+ }
+
+ queryResult.setAllDataSeries(resultSeries);
+
+ return queryResult;
+ }
+}
diff --git
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
index 0129fd4beb..01129f1ec2 100644
---
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
+++
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
@@ -18,17 +18,18 @@
package org.apache.streampipes.dataexplorer.influx;
+import org.apache.streampipes.dataexplorer.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.StreamedQueryResultProvider;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.export.OutputFormat;
-import org.apache.streampipes.dataexplorer.QueryResultProvider;
-import org.apache.streampipes.dataexplorer.StreamedQueryResultProvider;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import
org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
-import org.apache.streampipes.model.datalake.SpQueryStatus;
-import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.SpQueryStatus;
+import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
+
import java.io.IOException;
import java.io.OutputStream;
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 1a4f4241d9..8087e1707a 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -230,7 +230,7 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
try {
managementService.deleteAdapter(elementId);
- return ok(Notifications.success("Adapter with id: " + elementId +
" is dexleted."));
+ return ok(Notifications.success("Adapter with id: " + elementId +
" is deleted."));
} catch (AdapterException e) {
LOG.error("Error while deleting adapter with id {}", elementId, e);
return ok(Notifications.error(e.getMessage()));
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
index 72ae5bd07d..0d3549f8a2 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -128,7 +128,8 @@ public class DataLakeDataWriter {
private void renameTimestampField(Event event, String timestampField){
var strippedTime = getSubstringAfterColons(timestampField);
- event.addField(timestampField,
event.getFieldByRuntimeName(strippedTime).getRawValue());
+ event.addField(timestampField,
event.getFieldByRuntimeName(strippedTime).getAsPrimitive()
+ .getAsLong());
}
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index 691b45a963..84713d2889 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -123,7 +123,8 @@ public class DataLakeResource extends AbstractRestResource {
@Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation") @RequestParam(value = "endDate", required = false) Long endDate) {
if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate,
endDate)) {
- return ok();
+ return ok(Notifications
+ .success("Successfully deleted measure " + measurementID + " between
" + startDate + " and " + endDate));
} else {
return ResponseEntity
.status(HttpStatus.NOT_FOUND)