This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e95cb4359 fix: Cancel file replay when adapter is stopped, fix
translations (#4122)
3e95cb4359 is described below
commit 3e95cb43596d332bf521e651dd480d9bd822f479
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Jan 21 17:10:27 2026 +0100
fix: Cancel file replay when adapter is stopped, fix translations (#4122)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.../iiot/protocol/stream/FileReplayAdapter.java | 13 +++--
.../protocol/stream/FileReplayAdapterTest.java | 6 +--
ui/deployment/i18n/de.json | 5 +-
ui/deployment/i18n/en.json | 5 +-
.../src/lib/apis/datalake-rest.service.ts | 12 +++++
.../aggregate-configuration.component.html | 62 ++++++++++++----------
.../datalake-configuration.component.html | 12 ++---
.../delete-datalake-index-dialog.component.html | 10 +---
.../delete-datalake-index-dialog.component.ts | 29 ++++++----
9 files changed, 85 insertions(+), 69 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java
index 80f411d383..b9cc860138 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java
@@ -172,7 +172,7 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
TimeUnit.SECONDS
);
} else {
- executor.scheduleAtFixedRate(
+ executor.scheduleWithFixedDelay(
() -> getFileFromEndpointAndParseFile(extractor, collector,
adapterRuntimeContext),
0,
1,
@@ -293,6 +293,7 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
InputStream inputStream,
IAdapterRuntimeContext adapterRuntimeContext
) {
+
// The parse method does not throw AdapterExceptions, that's why the
logging is handeled within the catch blog here
parser.parse(inputStream, (event) -> {
try {
@@ -301,6 +302,8 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
adapterRuntimeContext
.getLogger()
.error(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
});
}
@@ -308,7 +311,7 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
protected void processEvent(
IEventCollector collector,
Map<String, Object> event
- ) throws AdapterException {
+ ) throws AdapterException, InterruptedException {
long actualEventTimestamp = getTimestampFromEvent(event);
@@ -341,7 +344,7 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
return actualEventTimestamp;
}
- private void reduceReplaySpeedIfRequired(long actualEventTimestamp) {
+ private void reduceReplaySpeedIfRequired(long actualEventTimestamp) throws
InterruptedException {
long sleepTime;
if (timestampLastEvent != -1 && actualEventTimestamp != -1) {
sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) /
speedUp);
@@ -350,11 +353,7 @@ public class FileReplayAdapter implements
StreamPipesAdapter {
}
// speed up is set to Float.MAX_VALUE when user selected fastest option
if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
- try {
Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("File stream adapter was stopped, the current replay is
interrupted", e);
- }
}
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java
index 7b38d5fe45..0147e392bf 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java
@@ -61,7 +61,7 @@ class FileReplayAdapterTest {
@Test
- void processEvent_shouldCollectEventWhenTimestampIsLong() throws
AdapterException {
+ void processEvent_shouldCollectEventWhenTimestampIsLong() throws
AdapterException, InterruptedException {
event.put(TIMESTAMP, TIMESTAMP_VALUE);
fileReplayAdapter.processEvent(collector, event);
@@ -70,7 +70,7 @@ class FileReplayAdapterTest {
}
@Test
- void processEvent_shouldCollectEventWhenTimestampIsInteger() throws
AdapterException {
+ void processEvent_shouldCollectEventWhenTimestampIsInteger() throws
AdapterException, InterruptedException {
event.put(TIMESTAMP, 1622544682);
fileReplayAdapter.processEvent(collector, event);
@@ -102,4 +102,4 @@ class FileReplayAdapterTest {
assertEquals(TIMESTAMP_VALUE, actualEventTimestamp);
}
-}
\ No newline at end of file
+}
diff --git a/ui/deployment/i18n/de.json b/ui/deployment/i18n/de.json
index 2072cb3c40..15ad832443 100644
--- a/ui/deployment/i18n/de.json
+++ b/ui/deployment/i18n/de.json
@@ -826,6 +826,7 @@
"New Export Provider": "Neuer Exportanbieter",
"Truncate data": "Daten leeren",
"Delete data": "Daten löschen",
+ "Delete dataset": "Datensatz löschen",
"Delete Export Provider": "Exportanbieter löschen",
"Test Export Provider Connection": "Test der Export-Provider-Verbindung",
"Set Data Retention": "Speicherrichtlinie bearbeiten",
@@ -1040,5 +1041,7 @@
" of ": " von ",
" items ": " Einträge",
"Error Details": "Fehler-Details",
- "{{ widgetTitle }} Clone": "{{ widgetTitle }} Kopie"
+ "{{ widgetTitle }} Clone": "{{ widgetTitle }} Kopie",
+ "Do you really want to delete the dataset {{index}}?": "Möchten Sie den
Datensatz {{index}} wirklich löschen?",
+ "Do you really want to truncate the data in {{index}}?": "Möchten Sie alle
Daten im Datensatz {{index}} wirklich leeren?"
}
diff --git a/ui/deployment/i18n/en.json b/ui/deployment/i18n/en.json
index c892654e20..8506920172 100644
--- a/ui/deployment/i18n/en.json
+++ b/ui/deployment/i18n/en.json
@@ -1040,5 +1040,8 @@
" of ": null,
" items ": null,
"Error Details": null,
- "{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone"
+ "{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone",
+ "Do you really want to delete the dataset {{index}}?": "Do you really want
to delete the dataset {{index}}?",
+ "Do you really want to truncate the data in {{index}}?": "Do you really want
to truncate the data in {{index}}?",
+ "Delete dataset": null
}
diff --git
a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
index b119cd6296..0b89eecce9 100644
---
a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
@@ -197,6 +197,18 @@ export class DatalakeRestService {
return this.http.request(request);
}
+ store(
+ measureName: string,
+ spQueryResult: SpQueryResult,
+ ignoreSchemaMismatch = true,
+ ): Observable<void> {
+ return this.http.post<void>(
+ `${this.dataLakeUrl}/measurements/${measureName}`,
+ spQueryResult,
+ {},
+ );
+ }
+
toHttpParams(queryParamObject: any): HttpParams {
return new HttpParams({ fromObject: queryParamObject });
}
diff --git
a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html
b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html
index e786128334..9fd6a3014f 100644
---
a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html
+++
b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/aggregate-configuration/aggregate-configuration.component.html
@@ -23,36 +23,40 @@
fxLayoutAlign="start center"
class="ml-0 mr-5 form-field-small"
>
- <mat-form-field
- appearance="outline"
- fxFlex="100"
- class="w-100"
- color="accent"
- >
- <mat-select
- [(value)]="queryConfig.aggregationTimeUnit"
+ <sp-form-field [level]="3" [label]="'Unit' | translate" class="w-100">
+ <mat-form-field
+ appearance="outline"
+ fxFlex="100"
class="w-100"
- (selectionChange)="triggerDataRefresh()"
+ color="accent"
>
- @for (agg of availableAggregations; track agg) {
- <mat-option [value]="agg.value">{{ agg.label
}}</mat-option>
- }
- </mat-select>
- <mat-label> {{ 'Unit' | translate }}</mat-label>
- </mat-form-field>
- <mat-form-field
- appearance="outline"
- fxFlex="100"
- class="w-100"
- color="accent"
- >
- <input
- matInput
- type="number"
- [(ngModel)]="queryConfig.aggregationValue"
- (ngModelChange)="triggerDataRefresh()"
- />
- <mat-label>{{ 'Value' | translate }}</mat-label>
- </mat-form-field>
+ <mat-select
+ [(value)]="queryConfig.aggregationTimeUnit"
+ class="w-100"
+ (selectionChange)="triggerDataRefresh()"
+ >
+ @for (agg of availableAggregations; track agg) {
+ <mat-option [value]="agg.value">{{
+ agg.label
+ }}</mat-option>
+ }
+ </mat-select>
+ </mat-form-field>
+ </sp-form-field>
+ <sp-form-field [level]="3" [label]="'Value' | translate" class="w-100">
+ <mat-form-field
+ appearance="outline"
+ fxFlex="100"
+ class="w-100"
+ color="accent"
+ >
+ <input
+ matInput
+ type="number"
+ [(ngModel)]="queryConfig.aggregationValue"
+ (ngModelChange)="triggerDataRefresh()"
+ />
+ </mat-form-field>
+ </sp-form-field>
</div>
</div>
diff --git
a/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html
b/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html
index c181b22821..db6509a8b5 100644
---
a/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html
+++
b/ui/src/app/dataset/components/datalake-configuration/datalake-configuration.component.html
@@ -281,9 +281,7 @@
(click)="openDownloadDialog(element.name)"
>
<mat-icon>download</mat-icon>
- <span>{{
- 'Download data from index' | translate
- }}</span>
+ <span>{{ 'Download' | translate }}</span>
</button>
@if (writeAccess) {
<button
@@ -292,9 +290,7 @@
(click)="cleanDatalakeIndex(element.name)"
>
<mat-icon>local_fire_department</mat-icon>
- <span>{{
- 'Truncate all data from index' | translate
- }}</span>
+ <span>{{ 'Truncate data' | translate }}</span>
</button>
<button
mat-menu-item
@@ -303,9 +299,7 @@
(click)="deleteDatalakeIndex(element.name)"
>
<mat-icon>delete</mat-icon>
- <span>{{
- 'Remove index from database' | translate
- }}</span>
+ <span>{{ 'Delete dataset' | translate }}</span>
</button>
<button
mat-menu-item
diff --git
a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html
b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html
index 2973c26ca8..0e8c1a726d 100644
---
a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html
+++
b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.html
@@ -28,20 +28,14 @@
@if (deleteDialog) {
<b
><h4>
- {{
- confirmDeleteMessage
- | translate: { index: measurementIndex
}
- }}
+ {{ confirmDeleteMessage }}
</h4></b
>
}
@if (!deleteDialog) {
<b
><h4>
- {{
- confirmTruncateMessage
- | translate: { index: measurementIndex
}
- }}
+ {{ confirmTruncateMessage }}
</h4></b
>
}
diff --git
a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts
b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts
index 20041a0cb1..c2cf495b9c 100644
---
a/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts
+++
b/ui/src/app/dataset/dialog/delete-datalake-index/delete-datalake-index-dialog.component.ts
@@ -16,7 +16,7 @@
*
*/
-import { Component, Input } from '@angular/core';
+import { Component, inject, Input, OnInit } from '@angular/core';
import { DialogRef } from '@streampipes/shared-ui';
import { DatalakeRestService } from '@streampipes/platform-services';
import { TranslateService } from '@ngx-translate/core';
@@ -26,7 +26,7 @@ import { TranslateService } from '@ngx-translate/core';
templateUrl: './delete-datalake-index-dialog.component.html',
standalone: false,
})
-export class DeleteDatalakeIndexComponent {
+export class DeleteDatalakeIndexComponent implements OnInit {
@Input()
measurementIndex: string;
@@ -36,16 +36,23 @@ export class DeleteDatalakeIndexComponent {
isInProgress = false;
currentStatus: any;
- confirmDeleteMessage =
- 'Do you really want to delete the data index {{index}}?';
- confirmTruncateMessage =
- 'Do you really want to truncate the data in {{index}}?';
+ private dialogRef = inject(DialogRef<DeleteDatalakeIndexComponent>);
+ private datalakeRestService = inject(DatalakeRestService);
+ private translateService = inject(TranslateService);
- constructor(
- private dialogRef: DialogRef<DeleteDatalakeIndexComponent>,
- private datalakeRestService: DatalakeRestService,
- private translateService: TranslateService,
- ) {}
+ confirmDeleteMessage = '';
+ confirmTruncateMessage = '';
+
+ ngOnInit() {
+ this.confirmDeleteMessage = this.translateService.instant(
+ 'Do you really want to delete the dataset {{index}}?',
+ { index: this.measurementIndex },
+ );
+ this.confirmTruncateMessage = this.translateService.instant(
+ 'Do you really want to truncate the data in {{index}}?',
+ { index: this.measurementIndex },
+ );
+ }
close(refreshDataLakeIndex: boolean) {
this.dialogRef.close(refreshDataLakeIndex);