This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 6a283327c2 feat: Improved Logging and Manual Trigger for Datalake
Retention (#3945)
6a283327c2 is described below
commit 6a283327c2e6eb4754faa28b70ec1d86b1620be8
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Wed Nov 26 08:16:23 2025 +0100
feat: Improved Logging and Manual Trigger for Datalake Retention (#3945)
Co-authored-by: Dominik Riemer <[email protected]>
---
.../apache/streampipes/commons/constants/Envs.java | 1 +
.../commons/environment/DefaultEnvironment.java | 5 +
.../commons/environment/Environment.java | 1 +
streampipes-data-export/pom.xml | 5 +
.../streampipes/export/DataLakeExportManager.java | 129 +++++----
.../model/datalake/RetentionExportConfig.java | 57 +++-
.../streampipes/model/datalake/RetentionLog.java | 69 +++++
.../rest/impl/datalake/DataLakeResource.java | 305 ++++++++-------------
.../service/core/scheduler/DataLakeScheduler.java | 158 +----------
ui/deployment/i18n/de.json | 12 +-
ui/deployment/i18n/en.json | 15 +-
.../src/lib/apis/datalake-rest.service.ts | 7 +
.../src/lib/model/gen/streampipes-model-client.ts | 5 +-
.../src/lib/model/gen/streampipes-model.ts | 26 +-
ui/src/app/configuration/configuration.module.ts | 4 +
.../datalake-configuration.component.html | 76 +++++
.../datalake-configuration.component.ts | 24 ++
.../data-retention-dialog.component.html | 13 +-
.../data-retention-dialog.component.ts | 21 +-
.../data-retention-log-dialog.component.html | 97 +++++++
.../data-retention-log-dialog.component.ts | 42 +--
.../data-retention-now-dialog.component.html | 81 ++++++
.../data-retention-now-dialog.component.ts | 75 +++++
23 files changed, 797 insertions(+), 431 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 5ad79d8e07..148fa66abd 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -156,6 +156,7 @@ public enum Envs {
//
//@Scheduled(cron = "0
//
*/2 * * *
// *") //Cron Job in Dev Setting; Running every 2 min
+ SP_RETENTION_LOG_LENGTH("SP_RETENTION_LOG_LENGTH", "10"),
// Logging
SP_LOGGING_FILE_ENABLED("SP_LOGGING_FILE_ENABLED", "false"),
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 3627c40299..2e93ea7f80 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -591,4 +591,9 @@ public class DefaultEnvironment implements Environment {
public StringEnvironmentVariable getDatalakeSchedulerCron() {
return new StringEnvironmentVariable(Envs.SP_DATALAKE_SCHEDULER_CRON);
}
+
+ @Override
+ public IntEnvironmentVariable getDatalakeRetentionLogLength() {
+ return new IntEnvironmentVariable(Envs.SP_RETENTION_LOG_LENGTH);
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index af604ac00e..30567d9214 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -253,4 +253,5 @@ public interface Environment {
DoubleEnvironmentVariable getMemoryManagerUsageThreshold();
DoubleEnvironmentVariable getMemoryWarningThreshold();
StringEnvironmentVariable getDatalakeSchedulerCron();
+ IntEnvironmentVariable getDatalakeRetentionLogLength();
}
diff --git a/streampipes-data-export/pom.xml b/streampipes-data-export/pom.xml
index 63bc466812..db8f417e48 100644
--- a/streampipes-data-export/pom.xml
+++ b/streampipes-data-export/pom.xml
@@ -52,6 +52,11 @@
<artifactId>streampipes-storage-management</artifactId>
<version>0.99.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-data-explorer-management</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/DataLakeExportManager.java
similarity index 66%
copy from
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
copy to
streampipes-data-export/src/main/java/org/apache/streampipes/export/DataLakeExportManager.java
index 123a051a6c..5078ab836e 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/DataLakeExportManager.java
@@ -15,9 +15,8 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.service.core.scheduler;
+package org.apache.streampipes.export;
-import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import
org.apache.streampipes.dataexplorer.export.ObjectStorge.ExportProviderFactory;
@@ -28,15 +27,12 @@ import
org.apache.streampipes.model.configuration.ExportProviderSettings;
import org.apache.streampipes.model.configuration.ProviderType;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.RetentionAction;
+import org.apache.streampipes.model.datalake.RetentionLog;
import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.SchedulingConfigurer;
-import org.springframework.scheduling.config.ScheduledTaskRegistrar;
-import org.springframework.scheduling.support.CronTrigger;
import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import java.io.IOException;
@@ -46,10 +42,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-@Configuration
-public class DataLakeScheduler implements SchedulingConfigurer {
+public class DataLakeExportManager {
- private static final Logger LOG =
LoggerFactory.getLogger(DataLakeScheduler.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeExportManager.class);
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement =
new DataExplorerDispatcher()
.getDataExplorerManager()
@@ -59,7 +54,9 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
.getDataExplorerManager()
.getQueryManagement(this.dataExplorerSchemaManagement);
- private void exportMeasurement(DataLakeMeasure dataLakeMeasure, Instant
now, long endDate) {
+ private String savePath = "";
+
+ private void exportMeasurement(DataLakeMeasure dataLakeMeasure, Instant
now, long endDate) throws Exception {
if (System.getenv("SP_RETENTION_LOCAL_DIR") == null ||
System.getenv("SP_RETENTION_LOCAL_DIR").isEmpty()) {
LOG.error("For Local Retention Storage, please configure the
environment variable SP_RETENTION_LOCAL_DIR");
@@ -91,8 +88,8 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
String exportProviderId =
dataLakeMeasure.getRetentionTime().getRetentionExportConfig()
.getExportProviderId();
- // FInd Item in Document
+ // FInd Item in Document
List<ExportProviderSettings> exportProviders =
StorageDispatcher.INSTANCE
.getNoSqlStore()
.getSpCoreConfigurationStorage()
@@ -110,7 +107,7 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
if (exportProviderSetting == null) {
LOG.error("The desired export provider was not found. No export
has been done.");
- return;
+ throw new Exception("The desired export provider was not found. No
export has been done.");
}
ProviderType providerType = exportProviderSetting.getProviderType();
@@ -123,25 +120,53 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
providerType, dataLakeMeasure.getMeasureName(),
exportProviderSetting,
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().format());
exportProvider.store(streamingOutput);
+ savePath = exportProvider.getFileName();
} catch (IllegalArgumentException e) {
- LOG.error("Export provider could not be created. Unsupported
provider type: {}. Error: {}", providerType,
- e.getMessage(), e);
+ String msg = String.format(
+ "Export provider could not be created. Unsupported
provider type: %s. Error: %s",
+ providerType, e.getMessage());
+
+ LOG.error(msg, e);
+ throw new IllegalArgumentException(msg, e);
+
} catch (IOException e) {
- LOG.error("I/O error occurred while trying to store data. Provider
Type: {}. Error: {}", providerType,
- e.getMessage(), e);
+ String msg = String.format(
+ "I/O error occurred while trying to store data. Provider
Type: %s. Error: %s",
+ providerType, e.getMessage());
+
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+
} catch (RuntimeException e) {
- LOG.error("Runtime exception occurred while attempting to store
data. Provider Type: {}. Error: {}",
- providerType, e.getMessage(), e);
+
+ String msg = String.format(
+ "Runtime exception occurred while attempting to store
data. Provider Type: %s. Error: %s",
+ providerType, e.getMessage());
+
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
} catch (Exception e) {
LOG.error("An unexpected error occurred during export. Provider
Type: {}. Error: {}", providerType,
e.getMessage(), e);
+ throw new Exception(
+ String.format("An unexpected error occurred during export.
Provider Type: %s. Error: %s",
+ providerType, e.getMessage()),
+ e);
}
}
+ private void updateLastSync(DataLakeMeasure dataLakeMeasure, Instant now,
boolean success, String error) {
+
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().setLastExport(now.toString());
+ dataLakeMeasure.getRetentionTime().getRetentionExportConfig()
+ .addRetentionLog(new RetentionLog(success, this.savePath,
now.toString(), error));
+ this.dataExplorerSchemaManagement.updateMeasurement(dataLakeMeasure);
+
+ }
+
private void deleteMeasurement(DataLakeMeasure dataLakeMeasure, Instant
now, long endDate) {
LOG.info("Current time in millis: " + now.toEpochMilli());
@@ -162,48 +187,40 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
return result;
}
- public void cleanupMeasurements() {
- List<DataLakeMeasure> allMeasurements =
this.dataExplorerSchemaManagement.getAllMeasurements();
- LOG.info("GET ALL Measurements");
- for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
-
- if (dataLakeMeasure.getRetentionTime() != null) {
- LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
-
- var result = getStartAndEndTime(
-
dataLakeMeasure.getRetentionTime().getDataRetentionConfig().olderThanDays());
- Instant now = (Instant) result.get("now");
- long endDate = (Long) result.get("endDate");
-
- if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.DELETE) {
- LOG.info("Start saving Measurement " +
dataLakeMeasure.getMeasureName());
- exportMeasurement(dataLakeMeasure, now, endDate);
- LOG.info("Measurements " +
dataLakeMeasure.getMeasureName() + " successfully saved");
- }
- if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.SAVE) {
- LOG.info("Start delete Measurement " +
dataLakeMeasure.getMeasureName());
- deleteMeasurement(dataLakeMeasure, now, endDate);
- LOG.info("Measurements " +
dataLakeMeasure.getMeasureName() + " successfully deleted");
- }
- }
- }
- }
-
- @Override
- public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
- var env = Environments.getEnvironment();
- LOG.info("Retention CRON Job triggered.");
- taskRegistrar.addTriggerTask(
+ public void cleanupSingleMeasurement(DataLakeMeasure dataLakeMeasure)
throws Exception {
+ boolean success = false;
+ Instant now = Instant.now();
+ if (dataLakeMeasure.getRetentionTime() != null) {
+ LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
- this::cleanupMeasurements,
+ var result = getStartAndEndTime(
+
dataLakeMeasure.getRetentionTime().getDataRetentionConfig().olderThanDays());
+ now = (Instant) result.get("now");
+ long endDate = (Long) result.get("endDate");
+ if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.DELETE) {
+ LOG.info("Start saving Measurement " +
dataLakeMeasure.getMeasureName());
+ try {
+ exportMeasurement(dataLakeMeasure, now, endDate);
+ } catch (Exception e) {
+ updateLastSync(dataLakeMeasure, now, false,
e.getMessage());
+ throw new Exception(e);
- triggerContext -> new
CronTrigger(env.getDatalakeSchedulerCron().getValueOrDefault())
- .nextExecution(triggerContext)
+ }
+ LOG.info("Measurements " + dataLakeMeasure.getMeasureName() +
" successfully saved");
+ }
+ if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.SAVE) {
+ LOG.info("Start delete Measurement " +
dataLakeMeasure.getMeasureName());
+ deleteMeasurement(dataLakeMeasure, now, endDate);
+ LOG.info("Measurements " + dataLakeMeasure.getMeasureName() +
" successfully deleted");
+ }
+ success = true;
+ updateLastSync(dataLakeMeasure, now, success, "-");
- );
+ }
+
- LOG.info("Retention CRON Job finished.");
}
-}
\ No newline at end of file
+
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
index f126a63744..3e71ccc3f6 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
@@ -17,14 +17,26 @@
*/
package org.apache.streampipes.model.datalake;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+
+import java.util.ArrayList;
+import java.util.List;
+
public class RetentionExportConfig {
-
- private ExportConfig exportConfig;
+
+ private ExportConfig exportConfig;
private String exportProviderId;
+ private String lastExport;
+ private List<RetentionLog> retentionLog = new ArrayList<>();
- public RetentionExportConfig(ExportConfig exportConfig, String
exportProviderId) {
+ public RetentionExportConfig(ExportConfig exportConfig, String
exportProviderId, String lastExport,
+ List<RetentionLog> retentionLog) {
this.exportConfig = exportConfig;
this.exportProviderId = exportProviderId;
+ this.lastExport = lastExport;
+ this.retentionLog = retentionLog;
}
public ExportConfig getExportConfig() {
@@ -34,6 +46,7 @@ public class RetentionExportConfig {
public void setExportConfig(ExportConfig exportConfig) {
this.exportConfig = exportConfig;
}
+
public String getExportProviderId() {
return exportProviderId;
}
@@ -41,4 +54,42 @@ public class RetentionExportConfig {
public void setExportProviderId(String exportProviderId) {
this.exportProviderId = exportProviderId;
}
+
+ public void setLastExport(String lastExport) {
+ this.lastExport = lastExport;
+ }
+
+ public String getLastExport() {
+ return lastExport;
+
+ }
+
+ public List<RetentionLog> getRetentionLog() {
+ return retentionLog;
+ }
+
+ public void setRetentionLog(List<RetentionLog> retentionLog) {
+ this.retentionLog = retentionLog;
+ }
+
+ public void addRetentionLog(RetentionLog log) {
+
+ Environment env = Environments.getEnvironment();
+
+ int maxSize = env.getDatalakeRetentionLogLength().getValueOrDefault();
+
+ if (this.retentionLog != null) {
+
+ if (this.retentionLog.size() >= maxSize) {
+ this.retentionLog.remove(0);
+ }
+
+ this.retentionLog.add(log);
+ } else {
+
+ this.retentionLog = new ArrayList<>();
+ this.retentionLog.add(log);
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionLog.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionLog.java
new file mode 100644
index 0000000000..045b09050c
--- /dev/null
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionLog.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.model.datalake;
+
+public class RetentionLog {
+ private boolean status;
+ private String name;
+ private String date;
+ private String error;
+
+ public RetentionLog() {
+ }
+
+ public RetentionLog(boolean status, String name, String date) {
+ this.status = status;
+ this.name = name;
+ this.date = date;
+ }
+
+ public RetentionLog(boolean status, String name, String date, String
error) {
+ this.status = status;
+ this.name = name;
+ this.date = date;
+ this.error = error;
+ }
+
+ public boolean getStatus() {
+ return status;
+ }
+ public void setStatus(boolean status) {
+ this.status = status;
+ }
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getDate() {
+ return date;
+ }
+ public void setDate(String date) {
+ this.date = date;
+ }
+ public String getError() {
+ return error;
+ }
+ public void setError(String error) {
+ this.error = error;
+ }
+
+}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index 2b93b2476c..691b45a963 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.export.OutputFormat;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import org.apache.streampipes.export.DataLakeExportManager;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.RetentionTimeConfig;
@@ -93,6 +94,7 @@ public class DataLakeResource extends AbstractRestResource {
private static final Logger LOG =
LoggerFactory.getLogger(DataLakeResource.class);
private final IDataExplorerQueryManagement dataExplorerQueryManagement;
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
+ private static DataLakeExportManager dataLakeExportManager = new
DataLakeExportManager();
public DataLakeResource() {
this.dataExplorerSchemaManagement = new DataExplorerDispatcher()
@@ -111,19 +113,16 @@ public class DataLakeResource extends
AbstractRestResource {
}
@DeleteMapping(path = "/measurements/{measurementID}")
- @Operation(summary = "Remove data from a single measurement series with
given id", tags = {"Data Lake"},
- responses = {
+ @Operation(summary = "Remove data from a single measurement series with
given id", tags = {
+ "Data Lake" }, responses = {
@ApiResponse(responseCode = "200", description = "Data from
measurement series successfully removed"),
- @ApiResponse(responseCode = "400", description = "Measurement series
with given id not found")})
+ @ApiResponse(responseCode = "400", description = "Measurement series
with given id not found") })
public ResponseEntity<?> deleteData(
- @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true)
- @PathVariable("measurementID") String measurementID
- , @Parameter(in = ParameterIn.QUERY, description = "start date for
slicing operation")
- @RequestParam(value = "startDate", required = false) Long startDate
- , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation")
- @RequestParam(value = "endDate", required = false) Long endDate) {
-
- if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate,
endDate)){
+ @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true) @PathVariable("measurementID") String
measurementID,
+ @Parameter(in = ParameterIn.QUERY, description = "start date for slicing
operation") @RequestParam(value = "startDate", required = false) Long startDate,
+ @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation") @RequestParam(value = "endDate", required = false) Long endDate) {
+
+ if (this.dataExplorerQueryManagement.deleteData(measurementID, startDate,
endDate)) {
return ok();
} else {
return ResponseEntity
@@ -134,19 +133,12 @@ public class DataLakeResource extends
AbstractRestResource {
@DeleteMapping(path = "/measurements/{measurementID}/drop")
@Operation(summary = "Drop a single measurement series with given id from
Data Lake and "
- + "remove related event property",
- tags = {
- "Data Lake"},
- responses = {
- @ApiResponse(
- responseCode = "200",
- description = "Measurement series successfully dropped from Data
Lake"),
- @ApiResponse(
- responseCode = "400",
- description = "Measurement series with given id or related event
property not found")})
+ + "remove related event property", tags = {
+ "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "200", description = "Measurement
series successfully dropped from Data Lake"),
+ @ApiResponse(responseCode = "400", description = "Measurement
series with given id or related event property not found") })
public ResponseEntity<?> dropMeasurementSeries(
- @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true)
- @PathVariable("measurementID") String measurementID) {
+ @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true) @PathVariable("measurementID") String
measurementID) {
boolean isSuccessDataLake =
this.dataExplorerQueryManagement.deleteData(measurementID);
@@ -167,12 +159,8 @@ public class DataLakeResource extends AbstractRestResource
{
}
@GetMapping(path = "/measurements", produces =
MediaType.APPLICATION_JSON_VALUE)
- @Operation(summary = "Get a list of all measurement series", tags = {"Data
Lake"},
- responses = {
- @ApiResponse(
- responseCode = "200",
- description = "array of stored measurement series",
- content = @Content(array = @ArraySchema(schema =
@Schema(implementation = DataLakeMeasure.class))))})
+ @Operation(summary = "Get a list of all measurement series", tags = { "Data
Lake" }, responses = {
+ @ApiResponse(responseCode = "200", description = "array of stored
measurement series", content = @Content(array = @ArraySchema(schema =
@Schema(implementation = DataLakeMeasure.class)))) })
public ResponseEntity<List<DataLakeMeasure>> getAll() {
List<DataLakeMeasure> allMeasurements =
this.dataExplorerSchemaManagement.getAllMeasurements();
return ok(allMeasurements);
@@ -180,69 +168,34 @@ public class DataLakeResource extends
AbstractRestResource {
@GetMapping(path = "/measurements/{measurementId}/tags", produces =
MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Map<String, Object>>
getTagValues(@PathVariable("measurementId") String measurementId,
-
@RequestParam("fields") String fields) {
+ @RequestParam("fields") String fields) {
Map<String, Object> tagValues =
dataExplorerQueryManagement.getTagValues(measurementId, fields);
return ok(tagValues);
}
-
@GetMapping(path = "/measurements/{measurementID}", produces =
MediaType.APPLICATION_JSON_VALUE)
- @Operation(summary = "Get data from a single measurement series by a given
id", tags = {"Data Lake"},
- responses = {
- @ApiResponse(
- responseCode = "400",
- description = "Measurement series with given id and requested
query specification not found"),
- @ApiResponse(
- responseCode = "200",
- description = "requested data", content = @Content(schema =
@Schema(implementation = DataSeries.class)))})
+ @Operation(summary = "Get data from a single measurement series by a given
id", tags = { "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "400", description = "Measurement series
with given id and requested query specification not found"),
+ @ApiResponse(responseCode = "200", description = "requested data",
content = @Content(schema = @Schema(implementation = DataSeries.class))) })
public ResponseEntity<?> getData(
- @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true)
- @PathVariable("measurementID") String measurementID
- , @Parameter(in = ParameterIn.QUERY, description = "the columns to be
selected (comma-separated)")
- @RequestParam(value = QP_COLUMNS, required = false) String columns
- , @Parameter(in = ParameterIn.QUERY, description = "start date for
slicing operation")
- @RequestParam(value = QP_START_DATE, required = false) Long startDate
- , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation")
- @RequestParam(value = QP_END_DATE, required = false) Long endDate
- , @Parameter(in = ParameterIn.QUERY, description = "page number for
paging operation")
- @RequestParam(value = QP_PAGE, required = false) Integer page
- , @Parameter(in = ParameterIn.QUERY, description = "maximum number of
retrieved query results")
- @RequestParam(value = QP_LIMIT, required = false) Integer limit
- , @Parameter(in = ParameterIn.QUERY, description = "offset")
- @RequestParam(value = QP_OFFSET, required = false) Integer offset
- , @Parameter(in = ParameterIn.QUERY, description = "grouping tags
(comma-separated) for grouping operation")
- @RequestParam(value = QP_GROUP_BY, required = false) String groupBy
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "ordering of retrieved query results (ASC or DESC -
default is ASC)")
- @RequestParam(value = QP_ORDER, required = false) String order
- , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation
function used for grouping operation")
- @RequestParam(value = QP_AGGREGATION_FUNCTION, required = false) String
aggregationFunction
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "time interval for aggregation (e.g. 1m - one minute)
for grouping operation")
- @RequestParam(value = QP_TIME_INTERVAL, required = false) String
timeInterval
- , @Parameter(in = ParameterIn.QUERY, description = "only return the
number of results")
- @RequestParam(value = QP_COUNT_ONLY, required = false) String countOnly
- ,
- @Parameter(in = ParameterIn.QUERY, description = "auto-aggregate the
number of results to avoid browser overload")
- @RequestParam(value = QP_AUTO_AGGREGATE, required = false) boolean
autoAggregate
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "filter conditions (a comma-separated list of filter
conditions"
- + "such as [field,operator,condition])")
- @RequestParam(value = QP_FILTER, required = false) String filter
- , @Parameter(in = ParameterIn.QUERY, description =
"missingValueBehaviour (ignore or empty)")
- @RequestParam(value = QP_MISSING_VALUE_BEHAVIOUR, required = false)
String missingValueBehaviour
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "the maximum amount of resulting events,"
- + "when too high the query status is set to TOO_MUCH_DATA")
- @RequestParam(value = QP_MAXIMUM_AMOUNT_OF_EVENTS, required = false)
Integer maximumAmountOfResults,
+ @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true) @PathVariable("measurementID") String
measurementID,
+ @Parameter(in = ParameterIn.QUERY, description = "the columns to be
selected (comma-separated)") @RequestParam(value = QP_COLUMNS, required =
false) String columns,
+ @Parameter(in = ParameterIn.QUERY, description = "start date for slicing
operation") @RequestParam(value = QP_START_DATE, required = false) Long
startDate,
+ @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation") @RequestParam(value = QP_END_DATE, required = false) Long endDate,
+ @Parameter(in = ParameterIn.QUERY, description = "page number for paging
operation") @RequestParam(value = QP_PAGE, required = false) Integer page,
+ @Parameter(in = ParameterIn.QUERY, description = "maximum number of
retrieved query results") @RequestParam(value = QP_LIMIT, required = false)
Integer limit,
+ @Parameter(in = ParameterIn.QUERY, description = "offset")
@RequestParam(value = QP_OFFSET, required = false) Integer offset,
+ @Parameter(in = ParameterIn.QUERY, description = "grouping tags
(comma-separated) for grouping operation") @RequestParam(value = QP_GROUP_BY,
required = false) String groupBy,
+ @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved
query results (ASC or DESC - default is ASC)") @RequestParam(value = QP_ORDER,
required = false) String order,
+ @Parameter(in = ParameterIn.QUERY, description = "name of aggregation
function used for grouping operation") @RequestParam(value =
QP_AGGREGATION_FUNCTION, required = false) String aggregationFunction,
+ @Parameter(in = ParameterIn.QUERY, description = "time interval for
aggregation (e.g. 1m - one minute) for grouping operation") @RequestParam(value
= QP_TIME_INTERVAL, required = false) String timeInterval,
+ @Parameter(in = ParameterIn.QUERY, description = "only return the number
of results") @RequestParam(value = QP_COUNT_ONLY, required = false) String
countOnly,
+ @Parameter(in = ParameterIn.QUERY, description = "auto-aggregate the
number of results to avoid browser overload") @RequestParam(value =
QP_AUTO_AGGREGATE, required = false) boolean autoAggregate,
+ @Parameter(in = ParameterIn.QUERY, description = "filter conditions (a
comma-separated list of filter conditions"
+ + "such as [field,operator,condition])") @RequestParam(value =
QP_FILTER, required = false) String filter,
+ @Parameter(in = ParameterIn.QUERY, description = "missingValueBehaviour
(ignore or empty)") @RequestParam(value = QP_MISSING_VALUE_BEHAVIOUR, required
= false) String missingValueBehaviour,
+ @Parameter(in = ParameterIn.QUERY, description = "the maximum amount of
resulting events,"
+ + "when too high the query status is set to TOO_MUCH_DATA")
@RequestParam(value = QP_MAXIMUM_AMOUNT_OF_EVENTS, required = false) Integer
maximumAmountOfResults,
@RequestParam Map<String, String> queryParams) {
if (!(checkProvidedQueryParams(queryParams))) {
@@ -250,8 +203,8 @@ public class DataLakeResource extends AbstractRestResource {
} else {
ProvidedRestQueryParams sanitizedParams = populate(measurementID,
queryParams);
try {
- SpQueryResult result =
- this.dataExplorerQueryManagement.getData(sanitizedParams,
isIgnoreMissingValues(missingValueBehaviour));
+ SpQueryResult result =
this.dataExplorerQueryManagement.getData(sanitizedParams,
+ isIgnoreMissingValues(missingValueBehaviour));
return ok(result);
} catch (RuntimeException e) {
return badRequest(SpLogMessage.from(e));
@@ -259,10 +212,7 @@ public class DataLakeResource extends AbstractRestResource
{
}
}
- @PostMapping(
- path = "/query",
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE)
+ @PostMapping(path = "/query", produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<List<SpQueryResult>> getData(@RequestBody
List<Map<String, String>> queryParams) {
var results = queryParams
.stream()
@@ -274,69 +224,33 @@ public class DataLakeResource extends
AbstractRestResource {
}
@GetMapping(path = "/measurements/{measurementID}/download", produces =
MediaType.APPLICATION_OCTET_STREAM_VALUE)
- @Operation(summary = "Download data from a single measurement series by a
given id", tags = {"Data Lake"},
- responses = {
- @ApiResponse(
- responseCode = "400",
- description = "Measurement series with given id and requested
query specification not found"),
- @ApiResponse(
- responseCode = "200",
- description = "requested data", content = @Content(schema =
@Schema(implementation = DataSeries.class)))})
+ @Operation(summary = "Download data from a single measurement series by a
given id", tags = {
+ "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "400", description = "Measurement series
with given id and requested query specification not found"),
+ @ApiResponse(responseCode = "200", description = "requested data",
content = @Content(schema = @Schema(implementation = DataSeries.class))) })
public ResponseEntity<StreamingResponseBody> downloadData(
- @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true)
- @PathVariable("measurementID") String measurementID
- , @Parameter(in = ParameterIn.QUERY, description = "the columns to be
selected (comma-separated)")
- @RequestParam(value = QP_COLUMNS, required = false) String columns
- , @Parameter(in = ParameterIn.QUERY, description = "start date for
slicing operation")
- @RequestParam(value = QP_START_DATE, required = false) Long startDate
- , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation")
- @RequestParam(value = QP_END_DATE, required = false) Long endDate
- , @Parameter(in = ParameterIn.QUERY, description = "page number for
paging operation")
- @RequestParam(value = QP_PAGE, required = false) Integer page
- , @Parameter(in = ParameterIn.QUERY, description = "maximum number of
retrieved query results")
- @RequestParam(value = QP_LIMIT, required = false) Integer limit
- , @Parameter(in = ParameterIn.QUERY, description = "offset")
- @RequestParam(value = QP_OFFSET, required = false) Integer offset
- , @Parameter(in = ParameterIn.QUERY, description = "grouping tags
(comma-separated) for grouping operation")
- @RequestParam(value = QP_GROUP_BY, required = false) String groupBy
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "ordering of retrieved query results (ASC or DESC -
default is ASC)")
- @RequestParam(value = QP_ORDER, required = false) String order
- , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation
function used for grouping operation")
- @RequestParam(value = QP_AGGREGATION_FUNCTION, required = false) String
aggregationFunction
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "time interval for aggregation (e.g. 1m - one minute)
for grouping operation")
- @RequestParam(value = QP_TIME_INTERVAL, required = false) String
timeInterval
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "format specification (csv, json - default is csv) for
data download")
- @RequestParam(value = QP_FORMAT, required = false) String format
- , @Parameter(in = ParameterIn.QUERY, description = "csv delimiter (comma
or semicolon)")
- @RequestParam(value = QP_CSV_DELIMITER, required = false) String
csvDelimiter
- , @Parameter(in = ParameterIn.QUERY, description =
"missingValueBehaviour (ignore or empty)")
- @RequestParam(value = QP_MISSING_VALUE_BEHAVIOUR, required = false)
String missingValueBehaviour
- ,
- @Parameter(
- in = ParameterIn.QUERY,
- description = "filter conditions (a comma-separated list of filter
conditions"
- + "such as [field,operator,condition])")
- @RequestParam(value = QP_FILTER, required = false) String filter,
- @Parameter(in = ParameterIn.QUERY, description = "Excel export with
template")
- @RequestParam(value = QP_XLSX_USE_TEMPLATE, required = false) boolean
useTemplate
- , @Parameter(in = ParameterIn.QUERY, description = "ID of the excel
template file to use")
- @RequestParam(value = QP_XLSX_TEMPLATE_ID, required = false) String
templateId
- , @Parameter(in = ParameterIn.QUERY, description = "The first row in the
excel file where data should be written")
- @RequestParam(value = QP_XLSX_START_ROW, required = false) Integer
startRow,
- @Parameter(in = ParameterIn.QUERY, description = "Use either label or
key as the column header")
- @RequestParam(value = QP_HEADER_COLUMN_NAME, required = false) String
headerColumnName,
+ @Parameter(in = ParameterIn.PATH, description = "the id of the
measurement series", required = true) @PathVariable("measurementID") String
measurementID,
+ @Parameter(in = ParameterIn.QUERY, description = "the columns to be
selected (comma-separated)") @RequestParam(value = QP_COLUMNS, required =
false) String columns,
+ @Parameter(in = ParameterIn.QUERY, description = "start date for slicing
operation") @RequestParam(value = QP_START_DATE, required = false) Long
startDate,
+ @Parameter(in = ParameterIn.QUERY, description = "end date for slicing
operation") @RequestParam(value = QP_END_DATE, required = false) Long endDate,
+ @Parameter(in = ParameterIn.QUERY, description = "page number for paging
operation") @RequestParam(value = QP_PAGE, required = false) Integer page,
+ @Parameter(in = ParameterIn.QUERY, description = "maximum number of
retrieved query results") @RequestParam(value = QP_LIMIT, required = false)
Integer limit,
+ @Parameter(in = ParameterIn.QUERY, description = "offset")
@RequestParam(value = QP_OFFSET, required = false) Integer offset,
+ @Parameter(in = ParameterIn.QUERY, description = "grouping tags
(comma-separated) for grouping operation") @RequestParam(value = QP_GROUP_BY,
required = false) String groupBy,
+ @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved
query results (ASC or DESC - default is ASC)") @RequestParam(value = QP_ORDER,
required = false) String order,
+ @Parameter(in = ParameterIn.QUERY, description = "name of aggregation
function used for grouping operation") @RequestParam(value =
QP_AGGREGATION_FUNCTION, required = false) String aggregationFunction,
+ @Parameter(in = ParameterIn.QUERY, description = "time interval for
aggregation (e.g. 1m - one minute) for grouping operation") @RequestParam(value
= QP_TIME_INTERVAL, required = false) String timeInterval,
+ @Parameter(in = ParameterIn.QUERY, description = "format specification
(csv, json - default is csv) for data download") @RequestParam(value =
QP_FORMAT, required = false) String format,
+ @Parameter(in = ParameterIn.QUERY, description = "csv delimiter (comma
or semicolon)") @RequestParam(value = QP_CSV_DELIMITER, required = false)
String csvDelimiter,
+ @Parameter(in = ParameterIn.QUERY, description = "missingValueBehaviour
(ignore or empty)") @RequestParam(value = QP_MISSING_VALUE_BEHAVIOUR, required
= false) String missingValueBehaviour,
+ @Parameter(in = ParameterIn.QUERY, description = "filter conditions (a
comma-separated list of filter conditions"
+ + "such as [field,operator,condition])") @RequestParam(value =
QP_FILTER, required = false) String filter,
+ @Parameter(in = ParameterIn.QUERY, description = "Excel export with
template") @RequestParam(value = QP_XLSX_USE_TEMPLATE, required = false)
boolean useTemplate,
+ @Parameter(in = ParameterIn.QUERY, description = "ID of the excel
template file to use") @RequestParam(value = QP_XLSX_TEMPLATE_ID, required =
false) String templateId,
+ @Parameter(in = ParameterIn.QUERY, description = "The first row in the
excel file where data should be written") @RequestParam(value =
QP_XLSX_START_ROW, required = false) Integer startRow,
+ @Parameter(in = ParameterIn.QUERY, description = "Use either label or
key as the column header") @RequestParam(value = QP_HEADER_COLUMN_NAME,
required = false) String headerColumnName,
@RequestParam Map<String, String> queryParams) {
-
if (!(checkProvidedQueryParams(queryParams))) {
throw new SpMessageException(HttpStatus.BAD_REQUEST,
Notifications.error("Wrong query parameters provided"));
} else {
@@ -363,23 +277,15 @@ public class DataLakeResource extends
AbstractRestResource {
}
}
- @PostMapping(
- path = "/measurements/{measurementID}",
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE)
- @Operation(summary = "Store a measurement series to a data lake with the
given id", tags = {"Data Lake"},
- responses = {
- @ApiResponse(
- responseCode = "400",
- description = "Can't store the given data to this data lake"),
- @ApiResponse(
- responseCode = "200",
- description = "Successfully stored data")})
+ @PostMapping(path = "/measurements/{measurementID}", produces =
MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Store a measurement series to a data lake with the
given id", tags = {
+ "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "400", description = "Can't store the
given data to this data lake"),
+ @ApiResponse(responseCode = "200", description = "Successfully
stored data") })
public ResponseEntity<?> storeDataToMeasurement(
@PathVariable String measurementID,
@RequestBody SpQueryResult queryResult,
- @Parameter(in = ParameterIn.QUERY, description = "should not identical
schemas be stored")
- @RequestParam(value = "ignoreSchemaMismatch", required = false) boolean
ignoreSchemaMismatch) {
+ @Parameter(in = ParameterIn.QUERY, description = "should not identical
schemas be stored") @RequestParam(value = "ignoreSchemaMismatch", required =
false) boolean ignoreSchemaMismatch) {
var dataWriter = new DataLakeDataWriter(ignoreSchemaMismatch);
try {
dataWriter.writeData(measurementID, queryResult);
@@ -391,9 +297,8 @@ public class DataLakeResource extends AbstractRestResource {
}
@DeleteMapping(path = "/measurements")
- @Operation(summary = "Remove all stored measurement series from Data Lake",
tags = {"Data Lake"},
- responses = {
- @ApiResponse(responseCode = "200", description = "All measurement
series successfully removed")})
+ @Operation(summary = "Remove all stored measurement series from Data Lake",
tags = { "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "200", description = "All measurement series
successfully removed") })
public ResponseEntity<?> removeAll() {
boolean isSuccess = this.dataExplorerQueryManagement.deleteAllData();
return ResponseEntity.ok(isSuccess);
@@ -403,44 +308,54 @@ public class DataLakeResource extends
AbstractRestResource {
return SUPPORTED_PARAMS.containsAll(providedParams.keySet());
}
- @PostMapping(
- path = "/{elementId}/cleanup",
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE)
- @PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
- @Operation(summary = "Sets the retention mechanism for a certain
measurement", tags = {"Data Lake"},
- responses = {
- @ApiResponse(
- responseCode = "400",
- description = "Can't store the given data to this data lake"),
- @ApiResponse(
- responseCode = "200",
- description = "Successfully stored data")})
+ @PostMapping(path = "/{elementId}/cleanup", produces =
MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+ @PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
+ @Operation(summary = "Sets the retention mechanism for a certain
measurement", tags = { "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "400", description = "Can't store the given
data to this data lake"),
+ @ApiResponse(responseCode = "200", description = "Successfully stored
data") })
public ResponseEntity<?> setDataLakeRetention(
@PathVariable String elementId,
- @RequestBody RetentionTimeConfig retention){
- var measure = this.dataExplorerSchemaManagement.getById(elementId);
- measure.setRetentionTime(retention);
- try {
- this.dataExplorerSchemaManagement.updateMeasurement(measure);
- } catch (IllegalArgumentException e) {
- return badRequest(e.getMessage());
+ @RequestBody RetentionTimeConfig retention) {
+ var measure = this.dataExplorerSchemaManagement.getById(elementId);
+ measure.setRetentionTime(retention);
+ try {
+ this.dataExplorerSchemaManagement.updateMeasurement(measure);
+ } catch (IllegalArgumentException e) {
+ return badRequest(e.getMessage());
}
-
+
return ok();
}
-@DeleteMapping(path = "/{elementId}/cleanup")
-public ResponseEntity<?> deleteDataLakeRetention(@PathVariable String
elementId) {
+ @DeleteMapping(path = "/{elementId}/cleanup")
+ public ResponseEntity<?> deleteDataLakeRetention(@PathVariable String
elementId) {
var measure = this.dataExplorerSchemaManagement.getById(elementId);
measure.deleteRetentionTime();
try {
- this.dataExplorerSchemaManagement.updateMeasurement(measure);
+ this.dataExplorerSchemaManagement.updateMeasurement(measure);
} catch (IllegalArgumentException e) {
- return badRequest(e.getMessage());
+ return badRequest(e.getMessage());
}
return ok();
-}
+ }
+
+ @PostMapping(path = "/{elementId}/runSyncNow")
+ @PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
+ @Operation(summary = "Runs the retention mechanism for a certain
measurement", tags = { "Data Lake" }, responses = {
+ @ApiResponse(responseCode = "200", description = "Successfully stored
data") })
+ public ResponseEntity<?> runDataLakeRetentionNow(
+ @PathVariable String elementId) {
+
+ try {
+ var measure = this.dataExplorerSchemaManagement.getById(elementId);
+ dataLakeExportManager.cleanupSingleMeasurement(measure);
+ return ok();
+
+ } catch (Exception e) {
+ return serverError(SpLogMessage.from(e));
+ }
+
+ }
private ProvidedRestQueryParams populate(String measurementId, Map<String,
String> rawParams) {
Map<String, String> queryParamMap = new HashMap<>();
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
index 123a051a6c..581f886b63 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
@@ -18,18 +18,10 @@
package org.apache.streampipes.service.core.scheduler;
import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
-import
org.apache.streampipes.dataexplorer.export.ObjectStorge.ExportProviderFactory;
-import org.apache.streampipes.dataexplorer.export.ObjectStorge.IObjectStorage;
-import org.apache.streampipes.dataexplorer.export.OutputFormat;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
-import org.apache.streampipes.model.configuration.ExportProviderSettings;
-import org.apache.streampipes.model.configuration.ProviderType;
+import org.apache.streampipes.export.DataLakeExportManager;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.model.datalake.RetentionAction;
-import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
-import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,173 +29,47 @@ import
org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
-import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
-import java.io.IOException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
@Configuration
public class DataLakeScheduler implements SchedulingConfigurer {
- private static final Logger LOG =
LoggerFactory.getLogger(DataLakeScheduler.class);
+ private static DataLakeExportManager dataLakeExportManager = new
DataLakeExportManager();
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeExportManager.class);
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement =
new DataExplorerDispatcher()
.getDataExplorerManager()
.getSchemaManagement();
- private final IDataExplorerQueryManagement dataExplorerQueryManagement =
new DataExplorerDispatcher()
- .getDataExplorerManager()
- .getQueryManagement(this.dataExplorerSchemaManagement);
-
- private void exportMeasurement(DataLakeMeasure dataLakeMeasure, Instant
now, long endDate) {
-
- if (System.getenv("SP_RETENTION_LOCAL_DIR") == null ||
System.getenv("SP_RETENTION_LOCAL_DIR").isEmpty()) {
- LOG.error("For Local Retention Storage, please configure the
environment variable SP_RETENTION_LOCAL_DIR");
- }
-
- var outputFormat = OutputFormat
-
.fromString(dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().format());
-
- Map<String, String> params = new HashMap<>();
-
- params.put("delimiter",
-
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().csvDelimiter());
- params.put("format",
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().format());
- params.put("headerColumnName",
-
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().headerColumnName());
- params.put("missingValueBehaviour",
-
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig()
- .missingValueBehaviour());
- params.put("endDate", Long.toString(endDate));
-
- ProvidedRestQueryParams sanitizedParams = new
ProvidedRestQueryParams(dataLakeMeasure.getMeasureName(), params);
- StreamingResponseBody streamingOutput = output ->
dataExplorerQueryManagement.getDataAsStream(
- sanitizedParams,
- outputFormat,
- "ignore".equals(
-
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig()
- .missingValueBehaviour()),
- output);
-
- String exportProviderId =
dataLakeMeasure.getRetentionTime().getRetentionExportConfig()
- .getExportProviderId();
- // FInd Item in Document
-
- List<ExportProviderSettings> exportProviders =
StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getSpCoreConfigurationStorage()
- .get()
- .getExportProviderSettings();
-
- ExportProviderSettings exportProviderSetting = null;
-
- for (int i = 0; i < exportProviders.size(); i++) {
- ExportProviderSettings existing = exportProviders.get(i);
- if (existing != null &&
existing.getProviderId().equals(exportProviderId)) {
- exportProviderSetting = existing;
- }
- }
-
- if (exportProviderSetting == null) {
- LOG.error("The desired export provider was not found. No export
has been done.");
- return;
- }
-
- ProviderType providerType = exportProviderSetting.getProviderType();
-
- LOG.info("Write to " + System.getenv("SP_RETENTION_LOCAL_DIR"));
-
- try {
-
- IObjectStorage exportProvider =
ExportProviderFactory.createExportProvider(
- providerType, dataLakeMeasure.getMeasureName(),
exportProviderSetting,
-
dataLakeMeasure.getRetentionTime().getRetentionExportConfig().getExportConfig().format());
- exportProvider.store(streamingOutput);
-
- } catch (IllegalArgumentException e) {
-
- LOG.error("Export provider could not be created. Unsupported
provider type: {}. Error: {}", providerType,
- e.getMessage(), e);
- } catch (IOException e) {
-
- LOG.error("I/O error occurred while trying to store data. Provider
Type: {}. Error: {}", providerType,
- e.getMessage(), e);
- } catch (RuntimeException e) {
- LOG.error("Runtime exception occurred while attempting to store
data. Provider Type: {}. Error: {}",
- providerType, e.getMessage(), e);
- } catch (Exception e) {
-
- LOG.error("An unexpected error occurred during export. Provider
Type: {}. Error: {}", providerType,
- e.getMessage(), e);
- }
- }
-
- private void deleteMeasurement(DataLakeMeasure dataLakeMeasure, Instant
now, long endDate) {
-
- LOG.info("Current time in millis: " + now.toEpochMilli());
- LOG.info("Current time in millis to delete: " + endDate);
-
-
this.dataExplorerQueryManagement.deleteData(dataLakeMeasure.getMeasureName(),
null, endDate);
- }
-
- private Map<String, Object> getStartAndEndTime(int olderThanDays) {
- Instant now = Instant.now();
- Instant daysAgo = now.minus(olderThanDays, ChronoUnit.DAYS);
-
- long endDate = daysAgo.toEpochMilli();
-
- Map<String, Object> result = new HashMap<>();
- result.put("now", now);
- result.put("endDate", endDate);
- return result;
- }
-
public void cleanupMeasurements() {
List<DataLakeMeasure> allMeasurements =
this.dataExplorerSchemaManagement.getAllMeasurements();
LOG.info("GET ALL Measurements");
for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
-
- if (dataLakeMeasure.getRetentionTime() != null) {
- LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
-
- var result = getStartAndEndTime(
-
dataLakeMeasure.getRetentionTime().getDataRetentionConfig().olderThanDays());
- Instant now = (Instant) result.get("now");
- long endDate = (Long) result.get("endDate");
-
- if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.DELETE) {
- LOG.info("Start saving Measurement " +
dataLakeMeasure.getMeasureName());
- exportMeasurement(dataLakeMeasure, now, endDate);
- LOG.info("Measurements " +
dataLakeMeasure.getMeasureName() + " successfully saved");
- }
- if
(dataLakeMeasure.getRetentionTime().getDataRetentionConfig().action() !=
RetentionAction.SAVE) {
- LOG.info("Start delete Measurement " +
dataLakeMeasure.getMeasureName());
- deleteMeasurement(dataLakeMeasure, now, endDate);
- LOG.info("Measurements " +
dataLakeMeasure.getMeasureName() + " successfully deleted");
- }
+ try {
+
dataLakeExportManager.cleanupSingleMeasurement(dataLakeMeasure);
+ } catch (Exception e) {
+ LOG.error(String.format("An unexpected error occurred during
export. Data Measure: %s. Error: %s",
+ dataLakeMeasure, e.getMessage()), e);
}
+
}
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
- var env = Environments.getEnvironment();
- LOG.info("Retention CRON Job triggered.");
+ var env = Environments.getEnvironment();
+ LOG.info("Retention CRON Job triggered.");
taskRegistrar.addTriggerTask(
this::cleanupMeasurements,
-
triggerContext -> new
CronTrigger(env.getDatalakeSchedulerCron().getValueOrDefault())
.nextExecution(triggerContext)
);
- LOG.info("Retention CRON Job finished.");
+ LOG.info("Retention CRON Job finished.");
}
}
\ No newline at end of file
diff --git a/ui/deployment/i18n/de.json b/ui/deployment/i18n/de.json
index 65388b02d6..ad37521bea 100644
--- a/ui/deployment/i18n/de.json
+++ b/ui/deployment/i18n/de.json
@@ -668,7 +668,14 @@
"This operation cannot be undone. Please ensure that the data provider is
not used in a datalake retention.": "Dieser Vorgang kann nicht rückgängig
gemacht werden. Bitte stellen Sie sicher, dass der Provider in keiner
Speicherrichtlinie verwendet wird.",
"Delete Data": "Daten löschen",
"Truncate Data": "Daten leeren",
+ "Date": "Datum",
+ "Path": "Pfad",
+ "State": "Status",
+ "Success": "Erfolg",
+ "Fail": "Fehler",
+ "Error": "Fehler",
"Start Sync": "Synchronisierung starten",
+ "Run Sync Now": "Sync jetzt ausführen",
"Delete Sync": "Sync löschen",
"Data Retention Action": "Aktion",
"delete": "löschen",
@@ -695,6 +702,8 @@
"Remove index from database": "Index aus der Datenbank löschen",
"Retention Rate": "Speicherrichtlinie",
"Set retention rate": "Speicherrichtlinie festlegen",
+ "Retention Log": "Speicherprotokoll",
+ "Open Retention Log": "Speicherprotokoll öffnen",
"(no stored measurements)": "(keine gespeicherten Measurements)",
"Export Providers": "Exportanbieter",
"Add, Edit, and Delete export providers used for backing up data lakes.":
"Hinzufügen, Bearbeiten und Löschen von Exportanbietern, die für die Sicherung
von Data Lakes verwendet werden.",
@@ -838,7 +847,7 @@
"Truncate data": "Daten leeren",
"Delete data": "Daten löschen",
"Delete Export Provider": "Exportanbieter löschen",
- " Test Export Provider Connection": " Test der Export-Provider-Verbindung",
+ "Test Export Provider Connection": "Test der Export-Provider-Verbindung",
"Set Data Retention": "Speicherrichtlinie bearbeiten",
"Owner": "Eigentümer",
"Public Element": "Öffentliches Element",
@@ -876,7 +885,6 @@
"Apply": "Anwenden",
"Refresh interval": "Aktualisierungsintervall",
"No entries available.": "Keine Einträge vorhanden.",
- "Error": "Fehler",
"Probable cause": "Wahrscheinliche Ursache",
"No more information": "Keine weiteren Informationen",
"Full details": "Alle Einzelheiten",
diff --git a/ui/deployment/i18n/en.json b/ui/deployment/i18n/en.json
index c5223035e5..1d8603c6da 100644
--- a/ui/deployment/i18n/en.json
+++ b/ui/deployment/i18n/en.json
@@ -668,7 +668,14 @@
"This operation cannot be undone. Please ensure that the data provider is
not used in a datalake retention.": null,
"Delete Data": null,
"Truncate Data": null,
+ "Date": null,
+ "Path": null,
+ "State": null,
+ "Success": null,
+ "Fail": null,
+ "Error": null,
"Start Sync": null,
+ "Run Sync Now": null,
"Delete Sync": null,
"Data Retention Action": null,
"delete": null,
@@ -695,6 +702,8 @@
"Remove index from database": null,
"Retention Rate": null,
"Set retention rate": null,
+ "Retention Log": null,
+ "Open Retention Log": null,
"(no stored measurements)": null,
"Export Providers": null,
"Add, Edit, and Delete export providers used for backing up data lakes.":
null,
@@ -834,11 +843,14 @@
"Connection could not be established.": null,
"Truncating data...": null,
"Deleting data...": null,
+ "Sync was successful.": null,
+ "Sync was not successful": null,
+ "Export Data": null,
"New Export Provider": null,
"Truncate data": null,
"Delete data": null,
"Delete Export Provider": null,
- " Test Export Provider Connection": null,
+ "Test Export Provider Connection": null,
"Set Data Retention": null,
"Owner": null,
"Public Element": null,
@@ -876,7 +888,6 @@
"Apply": null,
"Refresh interval": null,
"No entries available.": null,
- "Error": null,
"Probable cause": null,
"No more information": null,
"Full details": null,
diff --git
a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
index e5eeb66950..bade947937 100644
---
a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
@@ -175,6 +175,13 @@ export class DatalakeRestService {
return this.http.delete(url);
}
+ runCleanupNow(index: string) {
+ const url = `${this.dataLakeUrl}/${index}/runSyncNow`;
+ const request = new HttpRequest('POST', url, {});
+
+ return this.http.request(request);
+ }
+
buildDownloadRequest(index: string, queryParams: any) {
const url = this.dataLakeUrl + '/measurements/' + index + '/download';
const request = new HttpRequest('GET', url, {
diff --git
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts
index 36eacaf170..b5ef82ab32 100644
---
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2025-10-23
20:03:54.
+// Generated using typescript-generator version 3.2.1263 on 2025-11-20
09:13:04.
-import { Storable } from './streampipes-model';
+import { Storable } from './platform-services';
export class Group implements Storable {
alternateIds: string[];
diff --git
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index de2fa3210a..131db4da40 100644
---
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
@@ -3475,6 +3474,8 @@ export class ResourceMetadata {
export class RetentionExportConfig {
exportConfig: ExportConfig;
exportProviderId: string;
+ lastExport: string;
+ retentionLog: RetentionLog[];
static fromData(
data: RetentionExportConfig,
@@ -3486,6 +3487,29 @@ export class RetentionExportConfig {
const instance = target || new RetentionExportConfig();
instance.exportConfig = ExportConfig.fromData(data.exportConfig);
instance.exportProviderId = data.exportProviderId;
+ instance.lastExport = data.lastExport;
+ instance.retentionLog = __getCopyArrayFn(RetentionLog.fromData)(
+ data.retentionLog,
+ );
+ return instance;
+ }
+}
+
+export class RetentionLog {
+ date: string;
+ error: string;
+ name: string;
+ status: boolean;
+
+ static fromData(data: RetentionLog, target?: RetentionLog): RetentionLog {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new RetentionLog();
+ instance.date = data.date;
+ instance.error = data.error;
+ instance.name = data.name;
+ instance.status = data.status;
return instance;
}
}
diff --git a/ui/src/app/configuration/configuration.module.ts
b/ui/src/app/configuration/configuration.module.ts
index dadfbcac9c..33ec773710 100644
--- a/ui/src/app/configuration/configuration.module.ts
+++ b/ui/src/app/configuration/configuration.module.ts
@@ -114,6 +114,8 @@ import { SelectDataRetentionComponent } from
'./dialog/data-retention-dialog/com
import { SelectRetentionActionComponent } from
'./dialog/data-retention-dialog/components/select-retention/select-retention-action/select-retention-action.component';
import { SelectDataExportComponent } from
'./dialog/data-retention-dialog/components/select-export/select-format.component';
import { DeleteExportProviderComponent } from
'./dialog/delete-export-provider/delete-export-provider-dialog.component';
+import { DataRetentionNowDialogComponent } from
'./dialog/data-retention-now-dialog/data-retention-now-dialog.component';
+import { DataRetentionLogDialogComponent } from
'./dialog/data-retention-log-dialog/data-retention-log-dialog.component';
@NgModule({
imports: [
CommonModule,
@@ -225,6 +227,8 @@ import { DeleteExportProviderComponent } from
'./dialog/delete-export-provider/d
ServiceConfigsItemComponent,
ServiceConfigsNumberComponent,
DeleteDatalakeIndexComponent,
+ DataRetentionNowDialogComponent,
+ DataRetentionLogDialogComponent,
ExportProviderConnectionTestComponent,
EditAssetLocationComponent,
EditAssetLocationAreaComponent,
diff --git
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html
index 3a9f98b00a..1531206b7e 100644
---
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html
+++
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html
@@ -246,6 +246,82 @@
</td>
</ng-container>
+ <ng-container matColumnDef="retentionlog">
+ <th mat-header-cell *matHeaderCellDef>
+ {{ 'Retention Log' | translate }}
+ </th>
+ <td mat-cell *matCellDef="let configurationEntry">
+ <div fxLayout="row">
+ <span
+ fxFlex
+ fxFlexOrder="3"
+ fxLayout="row"
+ fxLayoutAlign="start center"
+ >
+ @if (configurationEntry?.retention) {
+ <button
+ color="accent"
+ mat-icon-button
+ [matTooltip]="
+ ('Open Retention Log'
+ | translate) +
+ (configurationEntry
+ ?.retention
+ ?.retentionExportConfig
+ ?.lastExport
+ ? ' • ' +
+ (configurationEntry
+ .retention
+
.retentionExportConfig
+ .lastExport
+ | date
+ :
'yyyy-MM-dd HH:mm:ss')
+ : '')
+ "
+
data-cy="datalake-retention-log-btn"
+ matTooltipPosition="above"
+ (click)="
+ openRetentionLog(
+ configurationEntry
+ ?.retention
+
.retentionExportConfig
+ .retentionLog
+ )
+ "
+ >
+ <i
+ class="material-icons"
+ [ngStyle]="{
+ color:
+ configurationEntry
+ ?.retention
+
?.retentionExportConfig
+ ?.retentionLog
+ ?.length &&
+ configurationEntry
+ .retention
+
.retentionExportConfig
+ .retentionLog[
+
configurationEntry
+ .retention
+
.retentionExportConfig
+
.retentionLog
+ .length - 1
+ ].status
+ ? 'green'
+ : 'red'
+ }"
+ >list_alt</i
+ >
+ </button>
+ } @else {
+ <p>-</p>
+ }
+ </span>
+ </div>
+ </td>
+ </ng-container>
+
<tr
mat-header-row
*matHeaderRowDef="displayedColumns"
diff --git
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
index bf28431c6e..12af50c644 100644
---
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
+++
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
@@ -30,6 +30,7 @@ import {
DatalakeRestService,
ExportProviderSettings,
ExportProviderService,
+ RetentionLog,
} from '@streampipes/platform-services';
import { MatPaginator } from '@angular/material/paginator';
import { MatSort } from '@angular/material/sort';
@@ -49,6 +50,7 @@ import { ExportProviderComponent } from
'../dialog/export-provider-dialog/export
import { DeleteExportProviderComponent } from
'../dialog/delete-export-provider/delete-export-provider-dialog.component';
import { TranslateService } from '@ngx-translate/core';
import { ExportProviderConnectionTestComponent } from
'../dialog/export-provider-connection-test/export-provider-connection-test.component';
+import { DataRetentionLogDialogComponent } from
'../dialog/data-retention-log-dialog/data-retention-log-dialog.component';
@Component({
selector: 'sp-datalake-configuration',
@@ -87,6 +89,7 @@ export class DatalakeConfigurationComponent implements OnInit
{
'truncate',
'remove',
'retention',
+ 'retentionlog',
];
displayedColumnsExport: string[] = [
@@ -139,6 +142,7 @@ export class DatalakeConfigurationComponent implements
OnInit {
if (measurement?.retentionTime != null) {
entry.retention = measurement.retentionTime;
}
+ console.log(entry.retention);
inUseMeasurements.forEach(inUseMeasurement => {
if (
inUseMeasurement.measureName ===
@@ -295,6 +299,26 @@ export class DatalakeConfigurationComponent implements
OnInit {
});
}
+ openRetentionLog(retentionLog: RetentionLog[]) {
+ const dialogRef: DialogRef<DataRetentionLogDialogComponent> =
+ this.dialogService.open(DataRetentionLogDialogComponent, {
+ panelType: PanelType.STANDARD_PANEL,
+ title: this.translateService.instant('Retention Log'),
+ width: '100vw',
+ data: {
+ retentionLog: retentionLog,
+ },
+ });
+
+ dialogRef.afterClosed().subscribe(data => {
+ if (data) {
+ setTimeout(() => {
+ this.loadAvailableMeasurements();
+ }, 1000);
+ }
+ });
+ }
+
onPageChange(event: any) {
this.pageIndex = event.pageIndex;
this.receiveMeasurementSizes(this.pageIndex);
diff --git
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
index dbc37cfd95..e65125852e 100644
---
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
+++
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
@@ -59,12 +59,23 @@
<button
mat-flat-button
color="accent"
- data-cy="download-configuration-download-btn"
+ data-cy="run-retention-now-btn"
+ [disabled]="!disableDelete"
+ (click)="runCleanUpNow()"
+ >
+ {{ 'Run Sync Now' | translate }}
+ </button>
+
+ <button
+ mat-flat-button
+ color="accent"
+ data-cy="delete-retention-configuration-btn"
[disabled]="!disableDelete"
(click)="deleteCleanUp()"
>
{{ 'Delete Sync' | translate }}
</button>
+
<button
mat-flat-button
class="mat-basic"
diff --git
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
index 5ccf811935..052f27c009 100644
---
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
+++
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
@@ -17,13 +17,15 @@
*/
import { Component, inject, Input, OnInit, ViewChild } from '@angular/core';
-import { DialogRef } from '@streampipes/shared-ui';
+import { DialogRef, DialogService, PanelType } from '@streampipes/shared-ui';
import { DataRetentionDialogModel } from './model/data-retention-dialog.model';
import {
DatalakeRestService,
ExportProviderSettings,
RetentionTimeConfig,
} from '@streampipes/platform-services';
+import { TranslateService } from '@ngx-translate/core';
+import { DataRetentionNowDialogComponent } from
'../data-retention-now-dialog/data-retention-now-dialog.component';
@Component({
selector: 'sp-data-retention-dialog',
@@ -43,6 +45,8 @@ export class DataRetentionDialogComponent implements OnInit {
disableDelete = false;
dialogRef = inject(DialogRef<DataRetentionDialogComponent>);
+ translateService = inject(TranslateService);
+ dialogService = inject(DialogService);
datalakeRestService = inject(DatalakeRestService);
ngOnInit() {
@@ -109,6 +113,21 @@ export class DataRetentionDialogComponent implements
OnInit {
this.close(true);
});
}
+ runCleanUpNow() {
+ const dialogRef: DialogRef<DataRetentionNowDialogComponent> =
+ this.dialogService.open(DataRetentionNowDialogComponent, {
+ panelType: PanelType.STANDARD_PANEL,
+ title: this.translateService.instant('Export Data'),
+ width: '70vw',
+ data: {
+ measurementIndex: this.measurementIndex,
+ },
+ });
+
+ dialogRef.afterClosed().subscribe(data => {
+ this.close(true);
+ });
+ }
requiresExportValidation(): boolean {
const action = this.retentionConfig?.dataRetentionConfig?.action;
diff --git
a/ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.html
b/ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.html
new file mode 100644
index 0000000000..98553bfb18
--- /dev/null
+++
b/ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.html
@@ -0,0 +1,97 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<div class="sp-dialog-container">
+ <div class="sp-dialog-content p-15">
+ <table
+ fxFlex="100"
+ mat-table
+ data-cy="retentionLog"
+ [dataSource]="retentionLog"
+ style="width: 100%"
+ matSort
+ >
+ <ng-container matColumnDef="date">
+ <th mat-header-cell mat-sort-header *matHeaderCellDef>
+ {{ 'Date' | translate }}
+ </th>
+ <td mat-cell *matCellDef="let configurationEntry">
+ {{
+ configurationEntry.date
+ | date: 'yyyy-MM-dd HH:mm:ss' : ''
+ }}
+ </td>
+ </ng-container>
+
+ <ng-container matColumnDef="path">
+ <th mat-header-cell mat-sort-header *matHeaderCellDef>
+ {{ 'Path' | translate }}
+ </th>
+ <td mat-cell *matCellDef="let configurationEntry">
+ {{ configurationEntry.name }}
+ </td>
+ </ng-container>
+
+ <ng-container matColumnDef="state">
+ <th mat-header-cell mat-sort-header *matHeaderCellDef>
+ {{ 'State' | translate }}
+ </th>
+ <td
+ mat-cell
+ *matCellDef="let configurationEntry"
+ [style.color]="configurationEntry.status ? 'green' : 'red'"
+ >
+ {{
+ configurationEntry.status
+ ? ('Success' | translate)
+ : ('Fail' | translate)
+ }}
+ </td>
+ </ng-container>
+
+ <ng-container matColumnDef="error">
+ <th mat-header-cell mat-sort-header *matHeaderCellDef>
+ {{ 'Error' | translate }}
+ </th>
+ <td mat-cell *matCellDef="let configurationEntry">
+ {{
+ configurationEntry?.error
+ ? configurationEntry.error
+ : '-'
+ }}
+ </td>
+ </ng-container>
+
+ <tr mat-header-row *matHeaderRowDef="displayedColumns"></tr>
+ <tr mat-row *matRowDef="let row; columns: displayedColumns"></tr>
+ </table>
+ </div>
+
+ <mat-divider></mat-divider>
+
+ <div class="sp-dialog-actions actions-align-right">
+ <button
+ mat-button
+ mat-flat-button
+ class="mat-basic"
+ (click)="close(false)"
+ >
+ {{ 'Close' | translate }}
+ </button>
+ </div>
+</div>
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
b/ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.ts
similarity index 50%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
copy to
ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.ts
index f126a63744..56b6beac95 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionExportConfig.java
+++
b/ui/src/app/configuration/dialog/data-retention-log-dialog/data-retention-log-dialog.component.ts
@@ -16,29 +16,29 @@
*
*/
-package org.apache.streampipes.model.datalake;
-public class RetentionExportConfig {
-
- private ExportConfig exportConfig;
- private String exportProviderId;
+import { Component, inject, Input, OnInit } from '@angular/core';
+import { DialogRef } from '@streampipes/shared-ui';
+import {
+ DatalakeRestService,
+ ExportProviderService,
+} from '@streampipes/platform-services';
+import { TranslateService } from '@ngx-translate/core';
- public RetentionExportConfig(ExportConfig exportConfig, String
exportProviderId) {
- this.exportConfig = exportConfig;
- this.exportProviderId = exportProviderId;
- }
+@Component({
+ selector: 'sp-data-retention-log-dialog',
+ templateUrl: './data-retention-log-dialog.component.html',
+ standalone: false,
+})
+export class DataRetentionLogDialogComponent {
+ @Input()
+ retentionLog: string;
- public ExportConfig getExportConfig() {
- return exportConfig;
- }
+ displayedColumns: string[] = ['date', 'path', 'state', 'error'];
- public void setExportConfig(ExportConfig exportConfig) {
- this.exportConfig = exportConfig;
- }
- public String getExportProviderId() {
- return exportProviderId;
- }
+ private dialogRef = inject(DialogRef<DataRetentionLogDialogComponent>);
+ private translateService = inject(TranslateService);
- public void setExportProviderId(String exportProviderId) {
- this.exportProviderId = exportProviderId;
+ close(refreshDataLakeIndex: boolean) {
+ this.dialogRef.close(refreshDataLakeIndex);
}
-}
\ No newline at end of file
+}
diff --git
a/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.html
b/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.html
new file mode 100644
index 0000000000..ae5ef9da4f
--- /dev/null
+++
b/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.html
@@ -0,0 +1,81 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<div class="sp-dialog-container">
+ <div class="sp-dialog-content p-15">
+ <div
+ fxLayoutAlign="center center"
+ fxLayout="column"
+ data-cy="connection-testing-in-progress"
+ >
+ @if (!isInProgress) {
+ <div fxLayout="row" fxLayoutAlign="space-around">
+ <h4>
+ <b>{{ currentStatus }}</b>
+ </h4>
+ </div>
+ }
+ </div>
+
+ <div
+ fxLayoutAlign="center center"
+ fxLayout="column"
+ data-cy="connection-testing-in-progress"
+ >
+ <div fxLayout="row" fxLayoutAlign="space-around">
+ {{ filePath }}
+ </div>
+ </div>
+
+ @if (isError) {
+ <div class="sp-dialog-content p-20">
+ <sp-exception-details [title]="title" [message]="errorMessage">
+ </sp-exception-details>
+ </div>
+ }
+
+ @if (isInProgress) {
+ <div
+ fxLayoutAlign="center center"
+ fxLayout="column"
+ data-cy="connection-testing-in-progress"
+ >
+ <div fxLayout="row" fxLayoutAlign="space-around">
+ <mat-spinner
+ [mode]="'indeterminate'"
+ color="accent"
+ diameter="25"
+ ></mat-spinner>
+ </div>
+ </div>
+ }
+ </div>
+
+ <mat-divider></mat-divider>
+
+ <div class="sp-dialog-actions actions-align-right">
+ <button
+ mat-button
+ mat-flat-button
+ class="mat-basic"
+ (click)="close(false)"
+ >
+ {{ 'Close' | translate }}
+ </button>
+ </div>
+</div>
diff --git
a/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.ts
b/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.ts
new file mode 100644
index 0000000000..dcc264f2e8
--- /dev/null
+++
b/ui/src/app/configuration/dialog/data-retention-now-dialog/data-retention-now-dialog.component.ts
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { Component, inject, Input, OnInit } from '@angular/core';
+import { DialogRef } from '@streampipes/shared-ui';
+import {
+ DatalakeRestService,
+ ExportProviderService,
+} from '@streampipes/platform-services';
+import { TranslateService } from '@ngx-translate/core';
+import { finalize } from 'rxjs';
+
+@Component({
+ selector: 'sp-data-retention-now-dialog',
+ templateUrl: './data-retention-now-dialog.component.html',
+ standalone: false,
+})
+export class DataRetentionNowDialogComponent implements OnInit {
+ @Input()
+ measurementIndex: string;
+
+ datalakeRestService = inject(DatalakeRestService);
+ private dialogRef = inject(DialogRef<DataRetentionNowDialogComponent>);
+ private translateService = inject(TranslateService);
+
+ isInProgress = true;
+ currentStatus: string = '';
+ errorMessage = '';
+ isError = false;
+ message = '';
+ filePath = '';
+
+ ngOnInit(): void {
+ this.isInProgress = true;
+ this.isError = false;
+
+ this.datalakeRestService
+ .runCleanupNow(this.measurementIndex)
+ .pipe(finalize(() => (this.isInProgress = false)))
+ .subscribe(
+ data => {
+ this.isError = false;
+ this.currentStatus = this.translateService.instant(
+ 'Sync was successful.',
+ );
+ },
+ errorMessage => {
+ this.isError = true;
+ this.errorMessage = errorMessage.error;
+ this.currentStatus = this.translateService.instant(
+ 'Sync was not successful',
+ );
+ },
+ );
+ }
+
+ close(refreshDataLakeIndex: boolean) {
+ this.dialogRef.close(refreshDataLakeIndex);
+ }
+}