This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 03ee016dc9 Small fixes for the export provider settings and the
retention mechanism (#3870)
03ee016dc9 is described below
commit 03ee016dc9d11f5a10731a62eae98c1de43ced6a
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Wed Oct 22 12:43:08 2025 +0200
Small fixes for the export provider settings and the retention mechanism
(#3870)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.../sinks/internal/jvm/datalake/DataLakeSink.java | 23 +++++++++++++-
.../model/datalake/DataLakeMeasure.java | 8 +++++
.../service/core/scheduler/DataLakeScheduler.java | 6 +++-
.../datalake-configuration.component.ts | 37 ++++++++++++++++------
.../data-retention-dialog.component.html | 1 +
.../data-retention-dialog.component.ts | 13 ++++++--
.../export-provider-dialog.component.ts | 2 +-
7 files changed, 75 insertions(+), 15 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 306327cca8..1fbb400dd8 100644
---
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -18,9 +18,11 @@
package org.apache.streampipes.sinks.internal.jvm.datalake;
+import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
@@ -28,6 +30,7 @@ import
org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import
org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
+import org.apache.streampipes.model.datalake.RetentionTimeConfig;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
@@ -92,6 +95,7 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
@Override
public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
+
var extractor = parameters.extractor();
var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY,
String.class);
@@ -106,7 +110,9 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
this.ensureMeasurementPropertiesExist(eventSchema);
- var measure = new DataLakeMeasure(measureName, timestampField,
eventSchema);
+ var retentionTimeConfig = getRetentionTime(measureName,
runtimeContext.getStreamPipesClient());
+
+ var measure = new DataLakeMeasure(measureName, timestampField,
eventSchema, retentionTimeConfig);
var schemaUpdateOptionString =
extractor.selectedSingleValue(SCHEMA_UPDATE_KEY, String.class);
@@ -151,6 +157,21 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
return staticProperty;
}
+ private RetentionTimeConfig getRetentionTime(String measureName,
IStreamPipesClient client){
+
+ IDataExplorerSchemaManagement dataExplorerSchemaManagement = new
DataExplorerDispatcher().getDataExplorerManager()
+ .getSchemaManagement();
+
+ var originalMeasure =
dataExplorerSchemaManagement.getExistingMeasureByName(measureName);
+
+ RetentionTimeConfig retentionTime = null;
+
+ if (originalMeasure.isPresent()){
+ retentionTime = originalMeasure.get().getRetentionTime();
+ }
+ return retentionTime;
+ }
+
/**
* Assigns property scopes to event properties based on the provided list of
dimension names.
*
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
index ae832d38ea..a413a664b1 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
@@ -76,6 +76,14 @@ public class DataLakeMeasure implements Storable {
this.timestampField = timestampField;
}
+
+ public DataLakeMeasure(String measureName, String timestampField,
EventSchema eventSchema, RetentionTimeConfig retentionTime) {
+ this.measureName = measureName;
+ this.eventSchema = eventSchema;
+ this.timestampField = timestampField;
+ this.retentionTime = retentionTime;
+ }
+
public String getMeasureName() {
return measureName;
}
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
index 61d08936c4..123a051a6c 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
@@ -166,8 +166,9 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
List<DataLakeMeasure> allMeasurements =
this.dataExplorerSchemaManagement.getAllMeasurements();
LOG.info("GET ALL Measurements");
for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
- LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
+
if (dataLakeMeasure.getRetentionTime() != null) {
+ LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
var result = getStartAndEndTime(
dataLakeMeasure.getRetentionTime().getDataRetentionConfig().olderThanDays());
@@ -191,6 +192,7 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
var env = Environments.getEnvironment();
+ LOG.info("Retention CRON Job triggered.");
taskRegistrar.addTriggerTask(
this::cleanupMeasurements,
@@ -201,5 +203,7 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
);
+ LOG.info("Retention CRON Job finished.");
+
}
}
\ No newline at end of file
diff --git
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
index 4738f77872..4bf20ecb24 100644
---
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
+++
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
@@ -16,7 +16,13 @@
*
*/
-import { Component, inject, OnInit, ViewChild } from '@angular/core';
+import {
+ ChangeDetectorRef,
+ Component,
+ inject,
+ OnInit,
+ ViewChild,
+} from '@angular/core';
import { MatTableDataSource } from '@angular/material/table';
import { DataLakeConfigurationEntry } from './datalake-configuration-entry';
import {
@@ -42,6 +48,7 @@ import { DataRetentionDialogComponent } from
'../dialog/data-retention-dialog/da
import { ExportProviderComponent } from
'../dialog/export-provider-dialog/export-provider-dialog.component';
import { DeleteExportProviderComponent } from
'../dialog/delete-export-provider/delete-export-provider-dialog.component';
import { TranslateService } from '@ngx-translate/core';
+import { delay } from 'rxjs';
@Component({
selector: 'sp-datalake-configuration',
@@ -62,6 +69,7 @@ export class DatalakeConfigurationComponent implements OnInit
{
private tabService = inject(SpConfigurationTabsService);
private exportProviderRestService = inject(ExportProviderService);
private translateService = inject(TranslateService);
+ private cdr = inject(ChangeDetectorRef);
dataSource: MatTableDataSource<DataLakeConfigurationEntry> =
new MatTableDataSource([]);
@@ -245,16 +253,25 @@ export class DatalakeConfigurationComponent implements
OnInit {
}
openRetentionDialog(measurementId: string) {
- this.dialogService.open(DataRetentionDialogComponent, {
- panelType: PanelType.SLIDE_IN_PANEL,
- title: this.translateService.instant('Set Data Retention'),
- width: '50vw',
- data: {
- dataRetentionDialogModel: {
- measureName: measurementId,
+ const dialogRef: DialogRef<DataRetentionDialogComponent> =
+ this.dialogService.open(DataRetentionDialogComponent, {
+ panelType: PanelType.SLIDE_IN_PANEL,
+ title: this.translateService.instant('Set Data Retention'),
+ width: '50vw',
+ data: {
+ dataRetentionDialogModel: {
+ measureName: measurementId,
+ },
+ measurementIndex: measurementId,
},
- measurementIndex: measurementId,
- },
+ });
+
+ dialogRef.afterClosed().subscribe(data => {
+ if (data) {
+ setTimeout(() => {
+ this.loadAvailableMeasurements();
+ }, 1000);
+ }
});
}
diff --git
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
index b190cdd024..66028ccd7c 100644
---
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
+++
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
@@ -61,6 +61,7 @@
mat-flat-button
color="accent"
data-cy="download-configuration-download-btn"
+ [disabled]="!disableDelete"
(click)="deleteCleanUp()"
>
{{ 'Delete Sync' | translate }}
diff --git
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
index 2db4496f78..5ccf811935 100644
---
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
+++
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
@@ -40,6 +40,8 @@ export class DataRetentionDialogComponent implements OnInit {
@Input()
measurementIndex: string;
+ disableDelete = false;
+
dialogRef = inject(DialogRef<DataRetentionDialogComponent>);
datalakeRestService = inject(DatalakeRestService);
@@ -52,6 +54,7 @@ export class DataRetentionDialogComponent implements OnInit {
measure?.retentionTime ||
measure.retentionTime != null
) {
+ this.disableDelete = true;
this.retentionConfig ??= measure.retentionTime;
} else {
this.retentionConfig ??= RetentionTimeConfig.fromData({
@@ -89,8 +92,13 @@ export class DataRetentionDialogComponent implements OnInit {
setCleanUp() {
this.datalakeRestService
.cleanup(this.measurementIndex, this.retentionConfig)
- .subscribe(data => {
- this.close(true);
+ .subscribe({
+ next: data => {
+ this.close(true);
+ },
+ error: err => {
+ this.close(false);
+ },
});
}
@@ -112,6 +120,7 @@ export class DataRetentionDialogComponent implements OnInit
{
this.retentionConfig?.retentionExportConfig?.exportConfig;
const providerId =
this.retentionConfig?.retentionExportConfig?.exportProviderId;
+
if (!exportConfig?.format) {
console.error('Export format is required.');
return false;
diff --git
a/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
b/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
index 5305516f89..a9a2dd74f8 100644
---
a/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
+++
b/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
@@ -66,7 +66,7 @@ export class ExportProviderComponent implements OnInit {
secretKey: ['', Validators.required],
endPoint: ['', [Validators.required, this.uriValidator]],
bucketName: ['', Validators.required],
- awsRegion: ['US_EAST_1', Validators.required],
+ awsRegion: ['us-east-1', Validators.required],
providerId: [''],
secretEncrypted: [false],
});