This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new ff334672d1 fix(#3531): Add option to reset an invalid adapter state in
API and UI (#3532)
ff334672d1 is described below
commit ff334672d14e131bac970e2c89b861710bc913da
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Mar 19 08:29:52 2025 +0100
fix(#3531): Add option to reset an invalid adapter state in API and UI
(#3532)
* fix(#3531): Add option to reset an invalid adapter state in API and UI
* Force stop in adapter resolver
---
.../management/health/AdapterHealthCheck.java | 2 +-
.../management/AdapterMasterManagement.java | 17 ++++++--
.../management/AdapterUpdateManagement.java | 2 +-
.../export/resolver/AdapterResolver.java | 2 +-
.../rest/impl/connect/AdapterResource.java | 9 ++--
.../src/lib/apis/adapter.service.ts | 11 ++++-
.../exception-details-dialog.component.html | 12 +++---
.../exception-details-dialog.component.ts | 10 ++++-
.../existing-adapters.component.ts | 50 ++++++++++++++--------
9 files changed, 77 insertions(+), 38 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
index e21d29de0d..b992aa7aac 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
@@ -118,7 +118,7 @@ public class AdapterHealthCheck implements Runnable {
adapterDescription.getElementId(),
adapterDescription.getName()
));
- LOG.info("Monitoring {} adapter instances", adapterMetrics.size());
+ LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
}
private void updateTotalEventsPublished(AdapterMetrics adapterMetrics,
String adapterId, String adapterName) {
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 79310b96b1..9b1002c527 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -125,7 +125,7 @@ public class AdapterMasterManagement {
// Stop stream adapter
try {
- stopStreamAdapter(elementId);
+ stopStreamAdapter(elementId, true);
} catch (AdapterException e) {
LOG.info("Could not stop adapter: " + elementId, e);
}
@@ -145,10 +145,21 @@ public class AdapterMasterManagement {
return adapterInstanceStorage.findAll();
}
- public void stopStreamAdapter(String elementId) throws AdapterException {
+ public void stopStreamAdapter(String elementId,
+ boolean forceStop) throws AdapterException {
AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
- WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+ try {
+ WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+ } catch (AdapterException e) {
+ if (!forceStop) {
+ throw new AdapterException("Could not stop adapter", e);
+ } else {
+ ad.setRunning(false);
+ ad.setSelectedEndpointUrl(null);
+ adapterInstanceStorage.updateElement(ad);
+ }
+ }
ExtensionsLogProvider.INSTANCE.reset(elementId);
// remove the adapter from the metrics manager so that
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index b64b661495..350c71609a 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -65,7 +65,7 @@ public class AdapterUpdateManagement {
boolean shouldRestart = ad.isRunning();
if (ad.isRunning()) {
- this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
+ this.adapterMasterManagement.stopStreamAdapter(ad.getElementId(), true);
}
// update data source in database
diff --git
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 0fdb6329d3..a6913d02a2 100644
---
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -91,7 +91,7 @@ public class AdapterResolver extends
AbstractResolver<AdapterDescription> {
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
- ).stopStreamAdapter(resourceId);
+ ).stopStreamAdapter(resourceId, true);
} catch (AdapterException e) {
LOG.warn("Error when stopping adapter with id {} and name {}",
resourceId, existingAdapter.getName());
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index b0a7b2c8e6..cf046735b1 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -186,10 +186,11 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
@PostMapping(path = "/{id}/stop", produces =
MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId',
'WRITE')")
- public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId) {
+ public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId,
+ @RequestParam(value = "forceStop",
defaultValue = "false") boolean forceStop) {
try {
- managementService.stopStreamAdapter(elementId);
- return ok(Notifications.success("Adapter started"));
+ managementService.stopStreamAdapter(elementId, forceStop);
+ return ok(Notifications.success("Adapter stopped"));
} catch (AdapterException e) {
LOG.error("Could not stop adapter with id {}", elementId, e);
return serverError(SpLogMessage.from(e));
@@ -201,7 +202,7 @@ public class AdapterResource extends
AbstractAdapterResource<AdapterMasterManage
public ResponseEntity<?> startAdapter(@PathVariable("id") String elementId) {
try {
managementService.startStreamAdapter(elementId);
- return ok(Notifications.success("Adapter stopped"));
+ return ok(Notifications.success("Adapter started"));
} catch (AdapterException e) {
LOG.error("Could not start adapter with id {}", elementId, e);
return serverError(SpLogMessage.from(e));
diff --git
a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
index ff32b390f4..fcd293a0ff 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
@@ -75,9 +75,16 @@ export class AdapterService {
);
}
- stopAdapter(adapter: AdapterDescription): Observable<Message> {
+ stopAdapter(
+ adapter: AdapterDescription,
+ forceStop = false,
+ ): Observable<Message> {
return this.http
- .post(this.adapterMasterUrl + adapter.elementId + '/stop', {})
+ .post(
+ this.adapterMasterUrl + adapter.elementId + '/stop',
+ {},
+ { params: { forceStop } },
+ )
.pipe(map(response => Message.fromData(response as any)));
}
diff --git
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
index 990888d2a3..fcee105bd6 100644
---
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
+++
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
@@ -23,12 +23,12 @@
</div>
<mat-divider></mat-divider>
<div class="sp-dialog-actions actions-align-right">
- <button
- mat-button
- mat-raised-button
- class="mat-basic"
- (click)="close()"
- >
+ @if (additionalButton) {
+ <button mat-raised-button color="accent" (click)="close(true)">
+ {{ additionalButtonText }}
+ </button>
+ }
+ <button mat-raised-button class="mat-basic" (click)="close()">
Close
</button>
</div>
diff --git
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
index b987f2f866..7929cd6510 100644
---
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
+++
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
@@ -35,12 +35,18 @@ export class SpExceptionDetailsDialogComponent implements
OnInit {
@Input()
title: string;
+ @Input()
+ additionalButton = false;
+
+ @Input()
+ additionalButtonText = 'Button';
+
constructor(
private dialogRef: DialogRef<SpExceptionDetailsDialogComponent>,
) {}
- close() {
- this.dialogRef.close();
+ close(additionalButtonClicked = false) {
+ this.dialogRef.close(additionalButtonClicked);
}
ngOnInit(): void {}
diff --git
a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
index 56c6f560d9..bf409cbadd 100644
---
a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
+++
b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
@@ -85,6 +85,9 @@ export class ExistingAdaptersComponent implements OnInit,
OnDestroy {
tutorialActiveSubscription: Subscription;
currentFilterIds: Set<string> = new Set<string>();
+ startAdapterErrorText = 'Could not start adapter';
+ stopAdapterErrorText = 'Could not stop adapter';
+
constructor(
private adapterService: AdapterService,
private dialogService: DialogService,
@@ -119,24 +122,18 @@ export class ExistingAdaptersComponent implements OnInit,
OnDestroy {
this.getAdaptersRunning();
},
error => {
- this.openAdapterStatusErrorDialog(
- error.error,
- 'Could not start adapter',
- );
+ this.openAdapterStatusErrorDialog(adapter, error.error, true);
},
);
}
- stopAdapter(adapter: AdapterDescription) {
- this.adapterService.stopAdapter(adapter).subscribe(
+ stopAdapter(adapter: AdapterDescription, forceStop = false) {
+ this.adapterService.stopAdapter(adapter, forceStop).subscribe(
_ => {
this.getAdaptersRunning();
},
error => {
- this.openAdapterStatusErrorDialog(
- error.error,
- 'Could not stop adapter',
- );
+ this.openAdapterStatusErrorDialog(adapter, error.error, false);
},
);
}
@@ -170,15 +167,32 @@ export class ExistingAdaptersComponent implements OnInit,
OnDestroy {
});
}
- openAdapterStatusErrorDialog(message: SpLogMessage, title: string) {
- this.dialogService.open(SpExceptionDetailsDialogComponent, {
- panelType: PanelType.STANDARD_PANEL,
- title: 'Adapter Status',
- width: '70vw',
- data: {
- message: message,
- title: title,
+ openAdapterStatusErrorDialog(
+ adapter: AdapterDescription,
+ message: SpLogMessage,
+ startAction: boolean,
+ ) {
+ const title = startAction
+ ? this.startAdapterErrorText
+ : this.stopAdapterErrorText;
+ const dialogRef = this.dialogService.open(
+ SpExceptionDetailsDialogComponent,
+ {
+ panelType: PanelType.STANDARD_PANEL,
+ title: 'Adapter Status',
+ width: '70vw',
+ data: {
+ message: message,
+ title: title,
+ additionalButton: !startAction,
+ additionalButtonText: 'Reset adapter state',
+ },
},
+ );
+ dialogRef.afterClosed().subscribe(forceStop => {
+ if (forceStop) {
+ this.stopAdapter(adapter, true);
+ }
});
}