This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch fix-replay-adapter in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 54793f14e5c6baa3ea4d2ba999919fc725cc1d70 Author: Dominik Riemer <[email protected]> AuthorDate: Wed Jan 21 11:36:52 2026 +0100 fix: Cancel file replay when adapter is stopped, fix translations --- .../iiot/protocol/stream/FileReplayAdapter.java | 13 +++-- ui/deployment/i18n/de.json | 5 +- ui/deployment/i18n/en.json | 5 +- .../src/lib/apis/datalake-rest.service.ts | 12 +++++ .../pipeline-element-runtime-info.component.ts | 1 + .../aggregate-configuration.component.html | 62 ++++++++++++---------- .../datalake-configuration.component.html | 12 ++--- .../delete-datalake-index-dialog.component.html | 10 +--- .../delete-datalake-index-dialog.component.ts | 29 ++++++---- 9 files changed, 83 insertions(+), 66 deletions(-) diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java index 80f411d383..b9cc860138 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java @@ -172,7 +172,7 @@ public class FileReplayAdapter implements StreamPipesAdapter { TimeUnit.SECONDS ); } else { - executor.scheduleAtFixedRate( + executor.scheduleWithFixedDelay( () -> getFileFromEndpointAndParseFile(extractor, collector, adapterRuntimeContext), 0, 1, @@ -293,6 +293,7 @@ public class FileReplayAdapter implements StreamPipesAdapter { InputStream inputStream, IAdapterRuntimeContext adapterRuntimeContext ) { + // The parse method does not throw AdapterExceptions, that's why the logging is handeled within the catch blog here parser.parse(inputStream, (event) -> { try { @@ -301,6 +302,8 @@ public class FileReplayAdapter implements StreamPipesAdapter { adapterRuntimeContext .getLogger() .error(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } }); } @@ -308,7 +311,7 @@ public class FileReplayAdapter implements StreamPipesAdapter { protected void processEvent( IEventCollector collector, Map<String, Object> event - ) throws AdapterException { + ) throws AdapterException, InterruptedException { long actualEventTimestamp = getTimestampFromEvent(event); @@ -341,7 +344,7 @@ public class FileReplayAdapter implements StreamPipesAdapter { return actualEventTimestamp; } - private void reduceReplaySpeedIfRequired(long actualEventTimestamp) { + private void reduceReplaySpeedIfRequired(long actualEventTimestamp) throws InterruptedException { long sleepTime; if (timestampLastEvent != -1 && actualEventTimestamp != -1) { sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) / speedUp); @@ -350,11 +353,7 @@ public class FileReplayAdapter implements StreamPipesAdapter { } // speed up is set to Float.MAX_VALUE when user selected fastest option if (sleepTime > 0 && speedUp != Float.MAX_VALUE) { - try { Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("File stream adapter was stopped, the current replay is interrupted", e); - } } } diff --git a/ui/deployment/i18n/de.json b/ui/deployment/i18n/de.json index 2072cb3c40..15ad832443 100644 --- a/ui/deployment/i18n/de.json +++ b/ui/deployment/i18n/de.json @@ -826,6 +826,7 @@ "New Export Provider": "Neuer Exportanbieter", "Truncate data": "Daten leeren", "Delete data": "Daten löschen", + "Delete dataset": "Datensatz löschen", "Delete Export Provider": "Exportanbieter löschen", "Test Export Provider Connection": "Test der Export-Provider-Verbindung", "Set Data Retention": "Speicherrichtlinie bearbeiten", @@ -1040,5 +1041,7 @@ " of ": " von ", " items ": " Einträge", "Error Details": "Fehler-Details", - "{{ widgetTitle }} Clone": "{{ widgetTitle }} Kopie" + "{{ widgetTitle }} Clone": "{{ widgetTitle }} Kopie", + "Do you really want to delete the dataset {{index}}?": "Möchten Sie den Datensatz {{index}} wirklich löschen?", + "Do you really want to truncate the data in {{index}}?": "Möchten Sie alle Daten im Datensatz {{index}} wirklich leeren?" } diff --git a/ui/deployment/i18n/en.json b/ui/deployment/i18n/en.json index c892654e20..8506920172 100644 --- a/ui/deployment/i18n/en.json +++ b/ui/deployment/i18n/en.json @@ -1040,5 +1040,8 @@ " of ": null, " items ": null, "Error Details": null, - "{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone" + "{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone", + "Do you really want to delete the dataset {{index}}?": "Do you really want to delete the dataset {{index}}?", + "Do you really want to truncate the data in {{index}}?": "Do you really want to truncate the data in {{index}}?", + "Delete dataset": null } diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts index b119cd6296..0b89eecce9 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts @@ -197,6 +197,18 @@ export class DatalakeRestService { return this.http.request(request); } + store( + measureName: string, + spQueryResult: SpQueryResult, + ignoreSchemaMismatch = true, + ): Observable<void> { + return this.http.post<void>( + `${this.dataLakeUrl}/measurements/${measureName}`, + spQueryResult, + {}, + ); + } + toHttpParams(queryParamObject: any): HttpParams { return new HttpParams({ fromObject: queryParamObject }); } diff --git a/ui/projects/streampipes/shared-ui/src/lib/components/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts b/ui/projects/streampipes/shared-ui/src/lib/components/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts index 6427c501d2..03a9bbfd1a 100644 --- a/ui/projects/streampipes/shared-ui/src/lib/components/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts +++ b/ui/projects/streampipes/shared-ui/src/lib/components/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts @@ -100,6 +100,7 @@ export class PipelineElementRuntimeInfoComponent implements OnInit, OnDestroy { const [firstKey] = Object.keys(responseJson); const json = responseJson[firstKey]; this.runtimeDataError = !json; + console.log(this.runtimeDataError); this.runtimeInfo.forEach(r => { const previousValue = r.value; r.value = json[r.runtimeName]; diff --git a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html index e786128334..9fd6a3014f 100644 --- a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html +++ b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html @@ -23,36 +23,40 @@ fxLayoutAlign="start center" class="ml-0 mr-5 form-field-small" > - <mat-form-field - appearance="outline" - fxFlex="100" - class="w-100" - color="accent" - > - <mat-select - [(value)]="queryConfig.aggregationTimeUnit" + <sp-form-field [level]="3" [label]="'Unit' | translate" class="w-100"> + <mat-form-field + appearance="outline" + fxFlex="100" class="w-100" - (selectionChange)="triggerDataRefresh()" + color="accent" > - @for (agg of availableAggregations; track agg) { - <mat-option [value]="agg.value">{{ agg.label }}</mat-option> - } - </mat-select> - <mat-label> {{ 'Unit' | translate }}</mat-label> - </mat-form-field> - <mat-form-field - appearance="outline" - fxFlex="100" - class="w-100" - color="accent" - > - <input - matInput - type="number" - [(ngModel)]="queryConfig.aggregationValue" - (ngModelChange)="triggerDataRefresh()" - /> - <mat-label>{{ 'Value' | translate }}</mat-label> - </mat-form-field> + <mat-select + [(value)]="queryConfig.aggregationTimeUnit" + class="w-100" + (selectionChange)="triggerDataRefresh()" + > + @for (agg of availableAggregations; track agg) { + <mat-option [value]="agg.value">{{ + agg.label + }}</mat-option> + } + </mat-select> + </mat-form-field> + </sp-form-field> + <sp-form-field [level]="3" [label]="'Value' | translate" class="w-100"> + <mat-form-field + appearance="outline" + fxFlex="100" + class="w-100" + color="accent" + > + <input + matInput + type="number" + [(ngModel)]="queryConfig.aggregationValue" + (ngModelChange)="triggerDataRefresh()" + /> + </mat-form-field> + </sp-form-field> </div> </div> diff --git a/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html b/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html index c181b22821..db6509a8b5 100644 --- a/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html +++ b/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html @@ -281,9 +281,7 @@ (click)="openDownloadDialog(element.name)" > <mat-icon>download</mat-icon> - <span>{{ - 'Download data from index' | translate - }}</span> + <span>{{ 'Download' | translate }}</span> </button> @if (writeAccess) { <button @@ -292,9 +290,7 @@ (click)="cleanDatalakeIndex(element.name)" > <mat-icon>local_fire_department</mat-icon> - <span>{{ - 'Truncate all data from index' | translate - }}</span> + <span>{{ 'Truncate data' | translate }}</span> </button> <button mat-menu-item @@ -303,9 +299,7 @@ (click)="deleteDatalakeIndex(element.name)" > <mat-icon>delete</mat-icon> - <span>{{ - 'Remove index from database' | translate - }}</span> + <span>{{ 'Delete dataset' | translate }}</span> </button> <button mat-menu-item diff --git a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html index 2973c26ca8..0e8c1a726d 100644 --- a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html +++ b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html @@ -28,20 +28,14 @@ @if (deleteDialog) { <b ><h4> - {{ - confirmDeleteMessage - | translate: { index: measurementIndex } - }} + {{ confirmDeleteMessage }} </h4></b > } @if (!deleteDialog) { <b ><h4> - {{ - confirmTruncateMessage - | translate: { index: measurementIndex } - }} + {{ confirmTruncateMessage }} </h4></b > } diff --git a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts index 20041a0cb1..c2cf495b9c 100644 --- a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts +++ b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts @@ -16,7 +16,7 @@ * */ -import { Component, Input } from '@angular/core'; +import { Component, inject, Input, OnInit } from '@angular/core'; import { DialogRef } from '@streampipes/shared-ui'; import { DatalakeRestService } from '@streampipes/platform-services'; import { TranslateService } from '@ngx-translate/core'; @@ -26,7 +26,7 @@ import { TranslateService } from '@ngx-translate/core'; templateUrl: './delete-datalake-index-dialog.component.html', standalone: false, }) -export class DeleteDatalakeIndexComponent { +export class DeleteDatalakeIndexComponent implements OnInit { @Input() measurementIndex: string; @@ -36,16 +36,23 @@ export class DeleteDatalakeIndexComponent { isInProgress = false; currentStatus: any; - confirmDeleteMessage = - 'Do you really want to delete the data index {{index}}?'; - confirmTruncateMessage = - 'Do you really want to truncate the data in {{index}}?'; + private dialogRef = inject(DialogRef<DeleteDatalakeIndexComponent>); + private datalakeRestService = inject(DatalakeRestService); + private translateService = inject(TranslateService); - constructor( - private dialogRef: DialogRef<DeleteDatalakeIndexComponent>, - private datalakeRestService: DatalakeRestService, - private translateService: TranslateService, - ) {} + confirmDeleteMessage = ''; + confirmTruncateMessage = ''; + + ngOnInit() { + this.confirmDeleteMessage = this.translateService.instant( + 'Do you really want to delete the dataset {{index}}?', + { index: this.measurementIndex }, + ); + this.confirmTruncateMessage = this.translateService.instant( + 'Do you really want to truncate the data in {{index}}?', + { index: this.measurementIndex }, + ); + } close(refreshDataLakeIndex: boolean) { this.dialogRef.close(refreshDataLakeIndex);
