This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 03ee016dc9 Small fixes for the export provider settings and the 
retention mechanism (#3870)
03ee016dc9 is described below

commit 03ee016dc9d11f5a10731a62eae98c1de43ced6a
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Wed Oct 22 12:43:08 2025 +0200

    Small fixes for the export provider settings and the retention mechanism 
(#3870)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 23 +++++++++++++-
 .../model/datalake/DataLakeMeasure.java            |  8 +++++
 .../service/core/scheduler/DataLakeScheduler.java  |  6 +++-
 .../datalake-configuration.component.ts            | 37 ++++++++++++++++------
 .../data-retention-dialog.component.html           |  1 +
 .../data-retention-dialog.component.ts             | 13 ++++++--
 .../export-provider-dialog.component.ts            |  2 +-
 7 files changed, 75 insertions(+), 15 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 306327cca8..1fbb400dd8 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -18,9 +18,11 @@
 
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
+import org.apache.streampipes.client.api.IStreamPipesClient;
 import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
@@ -28,6 +30,7 @@ import 
org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import 
org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
+import org.apache.streampipes.model.datalake.RetentionTimeConfig;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
@@ -92,6 +95,7 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
 
   @Override
   public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
+  
     var extractor = parameters.extractor();
     var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
     var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, 
String.class);
@@ -106,7 +110,9 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
 
     this.ensureMeasurementPropertiesExist(eventSchema);
 
-    var measure = new DataLakeMeasure(measureName, timestampField, 
eventSchema);
+    var retentionTimeConfig = getRetentionTime(measureName, 
runtimeContext.getStreamPipesClient());
+
+    var measure = new DataLakeMeasure(measureName, timestampField, 
eventSchema, retentionTimeConfig);
 
     var schemaUpdateOptionString = 
extractor.selectedSingleValue(SCHEMA_UPDATE_KEY, String.class);
 
@@ -151,6 +157,21 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
     return staticProperty;
   }
 
+  private RetentionTimeConfig getRetentionTime(String measureName, 
IStreamPipesClient client){
+
+    IDataExplorerSchemaManagement dataExplorerSchemaManagement = new 
DataExplorerDispatcher().getDataExplorerManager()
+        .getSchemaManagement();
+
+    var originalMeasure = 
dataExplorerSchemaManagement.getExistingMeasureByName(measureName);
+
+    RetentionTimeConfig retentionTime = null;
+
+    if (originalMeasure.isPresent()){
+      retentionTime = originalMeasure.get().getRetentionTime();
+    }
+  return retentionTime;
+  }
+
   /**
    * Assigns property scopes to event properties based on the provided list of 
dimension names.
    *
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
index ae832d38ea..a413a664b1 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
@@ -76,6 +76,14 @@ public class DataLakeMeasure implements Storable {
     this.timestampField = timestampField;
   }
 
+
+  public DataLakeMeasure(String measureName, String timestampField, 
EventSchema eventSchema, RetentionTimeConfig retentionTime) {
+    this.measureName = measureName;
+    this.eventSchema = eventSchema;
+    this.timestampField = timestampField;
+    this.retentionTime = retentionTime;
+  }
+
   public String getMeasureName() {
     return measureName;
   }
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
index 61d08936c4..123a051a6c 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
@@ -166,8 +166,9 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
         List<DataLakeMeasure> allMeasurements = 
this.dataExplorerSchemaManagement.getAllMeasurements();
         LOG.info("GET ALL Measurements");
         for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
-            LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
+            
             if (dataLakeMeasure.getRetentionTime() != null) {
+                LOG.info("Measurement " + dataLakeMeasure.getMeasureName());
 
                 var result = getStartAndEndTime(
                         
dataLakeMeasure.getRetentionTime().getDataRetentionConfig().olderThanDays());
@@ -191,6 +192,7 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
     @Override
     public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
         var env = Environments.getEnvironment(); 
+         LOG.info("Retention CRON Job triggered.");
         taskRegistrar.addTriggerTask(
 
                 this::cleanupMeasurements,
@@ -201,5 +203,7 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
 
         );
 
+         LOG.info("Retention CRON Job finished.");
+
     }
 }
\ No newline at end of file
diff --git 
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
 
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
index 4738f77872..4bf20ecb24 100644
--- 
a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
+++ 
b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.ts
@@ -16,7 +16,13 @@
  *
  */
 
-import { Component, inject, OnInit, ViewChild } from '@angular/core';
+import {
+    ChangeDetectorRef,
+    Component,
+    inject,
+    OnInit,
+    ViewChild,
+} from '@angular/core';
 import { MatTableDataSource } from '@angular/material/table';
 import { DataLakeConfigurationEntry } from './datalake-configuration-entry';
 import {
@@ -42,6 +48,7 @@ import { DataRetentionDialogComponent } from 
'../dialog/data-retention-dialog/da
 import { ExportProviderComponent } from 
'../dialog/export-provider-dialog/export-provider-dialog.component';
 import { DeleteExportProviderComponent } from 
'../dialog/delete-export-provider/delete-export-provider-dialog.component';
 import { TranslateService } from '@ngx-translate/core';
+import { delay } from 'rxjs';
 
 @Component({
     selector: 'sp-datalake-configuration',
@@ -62,6 +69,7 @@ export class DatalakeConfigurationComponent implements OnInit 
{
     private tabService = inject(SpConfigurationTabsService);
     private exportProviderRestService = inject(ExportProviderService);
     private translateService = inject(TranslateService);
+    private cdr = inject(ChangeDetectorRef);
 
     dataSource: MatTableDataSource<DataLakeConfigurationEntry> =
         new MatTableDataSource([]);
@@ -245,16 +253,25 @@ export class DatalakeConfigurationComponent implements 
OnInit {
     }
 
     openRetentionDialog(measurementId: string) {
-        this.dialogService.open(DataRetentionDialogComponent, {
-            panelType: PanelType.SLIDE_IN_PANEL,
-            title: this.translateService.instant('Set Data Retention'),
-            width: '50vw',
-            data: {
-                dataRetentionDialogModel: {
-                    measureName: measurementId,
+        const dialogRef: DialogRef<DataRetentionDialogComponent> =
+            this.dialogService.open(DataRetentionDialogComponent, {
+                panelType: PanelType.SLIDE_IN_PANEL,
+                title: this.translateService.instant('Set Data Retention'),
+                width: '50vw',
+                data: {
+                    dataRetentionDialogModel: {
+                        measureName: measurementId,
+                    },
+                    measurementIndex: measurementId,
                 },
-                measurementIndex: measurementId,
-            },
+            });
+
+        dialogRef.afterClosed().subscribe(data => {
+            if (data) {
+                setTimeout(() => {
+                    this.loadAvailableMeasurements();
+                }, 1000);
+            }
         });
     }
 
diff --git 
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
 
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
index b190cdd024..66028ccd7c 100644
--- 
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
+++ 
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.html
@@ -61,6 +61,7 @@
             mat-flat-button
             color="accent"
             data-cy="download-configuration-download-btn"
+            [disabled]="!disableDelete"
             (click)="deleteCleanUp()"
         >
             {{ 'Delete Sync' | translate }}
diff --git 
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
 
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
index 2db4496f78..5ccf811935 100644
--- 
a/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
+++ 
b/ui/src/app/configuration/dialog/data-retention-dialog/data-retention-dialog.component.ts
@@ -40,6 +40,8 @@ export class DataRetentionDialogComponent implements OnInit {
     @Input()
     measurementIndex: string;
 
+    disableDelete = false;
+
     dialogRef = inject(DialogRef<DataRetentionDialogComponent>);
     datalakeRestService = inject(DatalakeRestService);
 
@@ -52,6 +54,7 @@ export class DataRetentionDialogComponent implements OnInit {
                         measure?.retentionTime ||
                         measure.retentionTime != null
                     ) {
+                        this.disableDelete = true;
                         this.retentionConfig ??= measure.retentionTime;
                     } else {
                         this.retentionConfig ??= RetentionTimeConfig.fromData({
@@ -89,8 +92,13 @@ export class DataRetentionDialogComponent implements OnInit {
     setCleanUp() {
         this.datalakeRestService
             .cleanup(this.measurementIndex, this.retentionConfig)
-            .subscribe(data => {
-                this.close(true);
+            .subscribe({
+                next: data => {
+                    this.close(true);
+                },
+                error: err => {
+                    this.close(false);
+                },
             });
     }
 
@@ -112,6 +120,7 @@ export class DataRetentionDialogComponent implements OnInit 
{
             this.retentionConfig?.retentionExportConfig?.exportConfig;
         const providerId =
             this.retentionConfig?.retentionExportConfig?.exportProviderId;
+
         if (!exportConfig?.format) {
             console.error('Export format is required.');
             return false;
diff --git 
a/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
 
b/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
index 5305516f89..a9a2dd74f8 100644
--- 
a/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
+++ 
b/ui/src/app/configuration/dialog/export-provider-dialog/export-provider-dialog.component.ts
@@ -66,7 +66,7 @@ export class ExportProviderComponent implements OnInit {
             secretKey: ['', Validators.required],
             endPoint: ['', [Validators.required, this.uriValidator]],
             bucketName: ['', Validators.required],
-            awsRegion: ['US_EAST_1', Validators.required],
+            awsRegion: ['us-east-1', Validators.required],
             providerId: [''],
             secretEncrypted: [false],
         });

Reply via email to