This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2dbf1c27e feat: write data to Datalake from Java Client (#4047)
b2dbf1c27e is described below

commit b2dbf1c27e3382265b76e149543616038be5261f
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Tue Dec 16 09:55:10 2025 +0100

    feat: write data to Datalake from Java Client (#4047)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 .../client/api/IDataLakeResourceApi.java           | 24 ++++----
 .../streampipes/client/api/IStreamPipesClient.java |  2 +
 .../streampipes/client/StreamPipesClient.java      |  6 ++
 .../streampipes/client/api/AbstractClientApi.java  |  6 +-
 .../client/api/DataLakeResourceApi.java            | 69 ++++++++++++++++++++++
 .../client/serializer/ObjectSerializer.java        |  8 +++
 .../streampipes/client/serializer/Serializer.java  | 10 +++-
 .../client/serializer/SpQuerySerializer.java       | 64 ++++++++++++++++++++
 .../influx/DataExplorerQueryManagementInflux.java  |  9 +--
 .../rest/impl/connect/AdapterResource.java         |  2 +-
 .../rest/impl/datalake/DataLakeDataWriter.java     |  3 +-
 .../rest/impl/datalake/DataLakeResource.java       |  3 +-
 12 files changed, 183 insertions(+), 23 deletions(-)

diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
similarity index 60%
copy from 
streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
copy to 
streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
index 5e1e99e5af..c9bb85e210 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
+++ 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IDataLakeResourceApi.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.client.serializer;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+package org.apache.streampipes.client.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.model.datalake.SpQueryResult;
 
-public class ObjectSerializer<K, V> extends Serializer<K, V, V> {
+import java.util.Map;
+
+public interface IDataLakeResourceApi {
+  
+  void delete(String measurementID, Long startDate, Long endDate);
+
+  void update(String measurementID, SpQueryResult queryResult, boolean 
ignoreSchemaMismatch);
+
+  SpQueryResult get(String measurementID, Map<String, String> queryParams);
 
-  @Override
-  public V deserialize(String response, Class<V> targetClass) {
-    try {
-      return objectMapper.readValue(response, targetClass);
-    } catch (JsonProcessingException e) {
-      e.printStackTrace();
-      throw new SpRuntimeException(e.fillInStackTrace());
-    }
-  }
 }
diff --git 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
index ac8df164ca..44f5064431 100644
--- 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
+++ 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IStreamPipesClient.java
@@ -59,4 +59,6 @@ public interface IStreamPipesClient extends Serializable {
   void deliverEmail(SpEmail email);
 
   IFileApi fileApi();
+  
+  IDataLakeResourceApi dataLakeResourceApi();
 }
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index b9c6543760..d159809b1e 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.client.api.AdapterApi;
 import org.apache.streampipes.client.api.AdminApi;
 import org.apache.streampipes.client.api.CustomRequestApi;
 import org.apache.streampipes.client.api.DataLakeMeasureApi;
+import org.apache.streampipes.client.api.DataLakeResourceApi;
 import org.apache.streampipes.client.api.DataProcessorApi;
 import org.apache.streampipes.client.api.DataSinkApi;
 import org.apache.streampipes.client.api.DataStreamApi;
@@ -225,4 +226,9 @@ public class StreamPipesClient implements
   public FileApi fileApi() {
     return new FileApi(config);
   }
+
+    @Override
+  public DataLakeResourceApi dataLakeResourceApi () {
+    return new DataLakeResourceApi (config);
+  }
 }
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 17be13e40f..9e643d4daf 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -56,10 +56,12 @@ public class AbstractClientApi {
   }
 
   protected <T> void post(StreamPipesApiPath apiPath, T object) {
-    ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
-    new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, 
object).executeRequest();
+
+    ObjectSerializer <T,Void> serializer = new ObjectSerializer<>(true);
+    new PostRequestWithoutPayloadResponse(clientConfig, apiPath, serializer, 
object).executeRequest();
   }
 
+
   protected <T> void put(StreamPipesApiPath apiPath, T object) {
     ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
     new PutRequest<>(clientConfig, apiPath, serializer, 
object).executeRequest();
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
new file mode 100644
index 0000000000..726eb5d313
--- /dev/null
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeResourceApi.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataLakeResourceApi extends AbstractClientApi implements 
IDataLakeResourceApi {
+
+  public DataLakeResourceApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig);
+  }
+
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromStreamPipesBasePath()
+        .addToPath("api")
+        .addToPath("v4")
+        .addToPath("datalake")
+        .addToPath("measurements");
+  }
+
+  @Override
+  public void delete(String measurementID, Long startDate, Long endDate) {
+
+    Map<String, String> queryParams = new HashMap<>();
+    if (startDate != null) {
+      queryParams.put("startDate", startDate.toString());
+    }
+    if (endDate != null) {
+      queryParams.put("endDate", endDate.toString());
+    }
+    
delete(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
 Void.class);
+
+  }
+
+  @Override
+  public void update(String measurementID, SpQueryResult queryResult, boolean 
ignoreSchemaMismatch) {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("ignoreSchemaMismatch", 
String.valueOf(ignoreSchemaMismatch));
+    
post(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
 queryResult);
+  }
+
+  @Override
+  public SpQueryResult get(String measurementID, Map<String, String> 
queryParams) {
+    return 
getSingle(getBaseResourcePath().addToPath(measurementID).withQueryParameters(queryParams),
+        SpQueryResult.class);
+  }
+
+}
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
index 5e1e99e5af..ba39d0792d 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/ObjectSerializer.java
@@ -23,6 +23,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 
 public class ObjectSerializer<K, V> extends Serializer<K, V, V> {
 
+    public ObjectSerializer() {
+        super();  
+    }
+
+    public ObjectSerializer(boolean useDefaultTyping) {
+        super(useDefaultTyping);  
+    }
+
   @Override
   public V deserialize(String response, Class<V> targetClass) {
     try {
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
index 435d76a59b..5ed6cac869 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/Serializer.java
@@ -28,7 +28,15 @@ public abstract class Serializer<K, V, T> {
   protected ObjectMapper objectMapper;
 
   public Serializer() {
-    this.objectMapper = JacksonSerializer.getObjectMapper();
+    this(false);
+  }
+
+  public Serializer(boolean useDefaultTyping) {
+    if (useDefaultTyping) {
+      this.objectMapper = 
JacksonSerializer.getObjectMapper().deactivateDefaultTyping();
+    } else {
+      this.objectMapper = JacksonSerializer.getObjectMapper();
+    }
   }
 
   public String serialize(K object) {
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
new file mode 100644
index 0000000000..69bdf54563
--- /dev/null
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/serializer/SpQuerySerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.serializer;
+
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class SpQuerySerializer {
+    public static SpQueryResult processEventDataWithTags(List<? extends 
Map<String, ?>> events,
+            TreeMap<String, String> tags) throws IOException {
+
+        if (tags == null) {
+            tags = new TreeMap<>();
+        }
+
+        List<String> headers = new ArrayList<>(events.get(0).keySet());
+        List<List<Object>> rows = new ArrayList<>();
+
+        for (Map<String, ?> event : events) {
+            List<Object> row = new ArrayList<>();
+            for (String header : headers) {
+                row.add(event.get(header));
+            }
+            rows.add(row);
+        }
+
+        DataSeries series = new DataSeries(events.size(), rows, headers, new 
HashMap<>());
+
+        SpQueryResult queryResult = new SpQueryResult(events.size(), headers, 
Collections.singletonList(series));
+        List<DataSeries> resultSeries = new ArrayList<>();
+
+        for (DataSeries s : queryResult.getAllDataSeries()) {
+            s.setTags(tags);
+            resultSeries.add(s);
+        }
+
+        queryResult.setAllDataSeries(resultSeries);
+
+        return queryResult;
+    }
+}
diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
index 0129fd4beb..01129f1ec2 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerQueryManagementInflux.java
@@ -18,17 +18,18 @@
 
 package org.apache.streampipes.dataexplorer.influx;
 
+import org.apache.streampipes.dataexplorer.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.StreamedQueryResultProvider;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.export.OutputFormat;
-import org.apache.streampipes.dataexplorer.QueryResultProvider;
-import org.apache.streampipes.dataexplorer.StreamedQueryResultProvider;
 import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
 import 
org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
-import org.apache.streampipes.model.datalake.SpQueryStatus;
-import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.SpQueryStatus;
+import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
+
 
 import java.io.IOException;
 import java.io.OutputStream;
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 1a4f4241d9..8087e1707a 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -230,7 +230,7 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
           try {
             managementService.deleteAdapter(elementId);
 
-            return ok(Notifications.success("Adapter with id: " + elementId + 
" is dexleted."));
+            return ok(Notifications.success("Adapter with id: " + elementId + 
" is deleted."));
           } catch (AdapterException e) {
             LOG.error("Error while deleting adapter with id {}", elementId, e);
             return ok(Notifications.error(e.getMessage()));
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
index 72ae5bd07d..0d3549f8a2 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -128,7 +128,8 @@ public class DataLakeDataWriter {
 
   private void renameTimestampField(Event event, String timestampField){
     var strippedTime = getSubstringAfterColons(timestampField);
-    event.addField(timestampField, 
event.getFieldByRuntimeName(strippedTime).getRawValue());
+    event.addField(timestampField, 
event.getFieldByRuntimeName(strippedTime).getAsPrimitive()
+          .getAsLong());
   }
 
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index 691b45a963..84713d2889 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -123,7 +123,8 @@ public class DataLakeResource extends AbstractRestResource {
       @Parameter(in = ParameterIn.QUERY, description = "end date for slicing 
operation") @RequestParam(value = "endDate", required = false) Long endDate) {
 
     if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate, 
endDate)) {
-      return ok();
+      return ok(Notifications
+          .success("Successfully deleted measure " + measurementID + " between 
" + startDate + " and " + endDate));
     } else {
       return ResponseEntity
           .status(HttpStatus.NOT_FOUND)

Reply via email to