This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new ff334672d1 fix(#3531): Add option to reset an invalid adapter state in 
API and UI (#3532)
ff334672d1 is described below

commit ff334672d14e131bac970e2c89b861710bc913da
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Mar 19 08:29:52 2025 +0100

    fix(#3531): Add option to reset an invalid adapter state in API and UI 
(#3532)
    
    * fix(#3531): Add option to reset an invalid adapter state in API and UI
    
    * Force stop in adapter resolver
---
 .../management/health/AdapterHealthCheck.java      |  2 +-
 .../management/AdapterMasterManagement.java        | 17 ++++++--
 .../management/AdapterUpdateManagement.java        |  2 +-
 .../export/resolver/AdapterResolver.java           |  2 +-
 .../rest/impl/connect/AdapterResource.java         |  9 ++--
 .../src/lib/apis/adapter.service.ts                | 11 ++++-
 .../exception-details-dialog.component.html        | 12 +++---
 .../exception-details-dialog.component.ts          | 10 ++++-
 .../existing-adapters.component.ts                 | 50 ++++++++++++++--------
 9 files changed, 77 insertions(+), 38 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
index e21d29de0d..b992aa7aac 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
@@ -118,7 +118,7 @@ public class AdapterHealthCheck implements Runnable {
                                       adapterDescription.getElementId(),
                                       adapterDescription.getName()
                                   ));
-    LOG.info("Monitoring {} adapter instances", adapterMetrics.size());
+    LOG.debug("Monitoring {} adapter instances", adapterMetrics.size());
   }
 
   private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, 
String adapterId, String adapterName) {
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 79310b96b1..9b1002c527 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -125,7 +125,7 @@ public class AdapterMasterManagement {
 
     // Stop stream adapter
     try {
-      stopStreamAdapter(elementId);
+      stopStreamAdapter(elementId, true);
     } catch (AdapterException e) {
       LOG.info("Could not stop adapter: " + elementId, e);
     }
@@ -145,10 +145,21 @@ public class AdapterMasterManagement {
     return adapterInstanceStorage.findAll();
   }
 
-  public void stopStreamAdapter(String elementId) throws AdapterException {
+  public void stopStreamAdapter(String elementId,
+                                boolean forceStop) throws AdapterException {
     AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
 
-    WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+    try {
+      WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+    } catch (AdapterException e) {
+      if (!forceStop) {
+        throw new AdapterException("Could not stop adapter", e);
+      } else {
+        ad.setRunning(false);
+        ad.setSelectedEndpointUrl(null);
+        adapterInstanceStorage.updateElement(ad);
+      }
+    }
     ExtensionsLogProvider.INSTANCE.reset(elementId);
 
     // remove the adapter from the metrics manager so that
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index b64b661495..350c71609a 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -65,7 +65,7 @@ public class AdapterUpdateManagement {
     boolean shouldRestart = ad.isRunning();
 
     if (ad.isRunning()) {
-      this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
+      this.adapterMasterManagement.stopStreamAdapter(ad.getElementId(), true);
     }
 
     // update data source in database
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 0fdb6329d3..a6913d02a2 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -91,7 +91,7 @@ public class AdapterResolver extends 
AbstractResolver<AdapterDescription> {
               new SpResourceManager().manageAdapters(),
               new SpResourceManager().manageDataStreams(),
               AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-          ).stopStreamAdapter(resourceId);
+          ).stopStreamAdapter(resourceId, true);
         } catch (AdapterException e) {
           LOG.warn("Error when stopping adapter with id {} and name {}", 
resourceId, existingAdapter.getName());
         }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index b0a7b2c8e6..cf046735b1 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -186,10 +186,11 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
 
   @PostMapping(path = "/{id}/stop", produces = 
MediaType.APPLICATION_JSON_VALUE)
   @PreAuthorize("this.hasWriteAuthority() and hasPermission('#elementId', 
'WRITE')")
-  public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId) {
+  public ResponseEntity<?> stopAdapter(@PathVariable("id") String elementId,
+                                       @RequestParam(value = "forceStop", 
defaultValue = "false") boolean forceStop) {
     try {
-      managementService.stopStreamAdapter(elementId);
-      return ok(Notifications.success("Adapter started"));
+      managementService.stopStreamAdapter(elementId, forceStop);
+      return ok(Notifications.success("Adapter stopped"));
     } catch (AdapterException e) {
       LOG.error("Could not stop adapter with id {}", elementId, e);
       return serverError(SpLogMessage.from(e));
@@ -201,7 +202,7 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
   public ResponseEntity<?> startAdapter(@PathVariable("id") String elementId) {
     try {
       managementService.startStreamAdapter(elementId);
-      return ok(Notifications.success("Adapter stopped"));
+      return ok(Notifications.success("Adapter started"));
     } catch (AdapterException e) {
       LOG.error("Could not start adapter with id {}", elementId, e);
       return serverError(SpLogMessage.from(e));
diff --git 
a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts 
b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
index ff32b390f4..fcd293a0ff 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
@@ -75,9 +75,16 @@ export class AdapterService {
         );
     }
 
-    stopAdapter(adapter: AdapterDescription): Observable<Message> {
+    stopAdapter(
+        adapter: AdapterDescription,
+        forceStop = false,
+    ): Observable<Message> {
         return this.http
-            .post(this.adapterMasterUrl + adapter.elementId + '/stop', {})
+            .post(
+                this.adapterMasterUrl + adapter.elementId + '/stop',
+                {},
+                { params: { forceStop } },
+            )
             .pipe(map(response => Message.fromData(response as any)));
     }
 
diff --git 
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
 
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
index 990888d2a3..fcee105bd6 100644
--- 
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
+++ 
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.html
@@ -23,12 +23,12 @@
     </div>
     <mat-divider></mat-divider>
     <div class="sp-dialog-actions actions-align-right">
-        <button
-            mat-button
-            mat-raised-button
-            class="mat-basic"
-            (click)="close()"
-        >
+        @if (additionalButton) {
+            <button mat-raised-button color="accent" (click)="close(true)">
+                {{ additionalButtonText }}
+            </button>
+        }
+        <button mat-raised-button class="mat-basic" (click)="close()">
             Close
         </button>
     </div>
diff --git 
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
 
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
index b987f2f866..7929cd6510 100644
--- 
a/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
+++ 
b/ui/projects/streampipes/shared-ui/src/lib/components/sp-exception-message/exception-details-dialog/exception-details-dialog.component.ts
@@ -35,12 +35,18 @@ export class SpExceptionDetailsDialogComponent implements 
OnInit {
     @Input()
     title: string;
 
+    @Input()
+    additionalButton = false;
+
+    @Input()
+    additionalButtonText = 'Button';
+
     constructor(
         private dialogRef: DialogRef<SpExceptionDetailsDialogComponent>,
     ) {}
 
-    close() {
-        this.dialogRef.close();
+    close(additionalButtonClicked = false) {
+        this.dialogRef.close(additionalButtonClicked);
     }
 
     ngOnInit(): void {}
diff --git 
a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
 
b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
index 56c6f560d9..bf409cbadd 100644
--- 
a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
+++ 
b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts
@@ -85,6 +85,9 @@ export class ExistingAdaptersComponent implements OnInit, 
OnDestroy {
     tutorialActiveSubscription: Subscription;
     currentFilterIds: Set<string> = new Set<string>();
 
+    startAdapterErrorText = 'Could not start adapter';
+    stopAdapterErrorText = 'Could not stop adapter';
+
     constructor(
         private adapterService: AdapterService,
         private dialogService: DialogService,
@@ -119,24 +122,18 @@ export class ExistingAdaptersComponent implements OnInit, 
OnDestroy {
                 this.getAdaptersRunning();
             },
             error => {
-                this.openAdapterStatusErrorDialog(
-                    error.error,
-                    'Could not start adapter',
-                );
+                this.openAdapterStatusErrorDialog(adapter, error.error, true);
             },
         );
     }
 
-    stopAdapter(adapter: AdapterDescription) {
-        this.adapterService.stopAdapter(adapter).subscribe(
+    stopAdapter(adapter: AdapterDescription, forceStop = false) {
+        this.adapterService.stopAdapter(adapter, forceStop).subscribe(
             _ => {
                 this.getAdaptersRunning();
             },
             error => {
-                this.openAdapterStatusErrorDialog(
-                    error.error,
-                    'Could not stop adapter',
-                );
+                this.openAdapterStatusErrorDialog(adapter, error.error, false);
             },
         );
     }
@@ -170,15 +167,32 @@ export class ExistingAdaptersComponent implements OnInit, 
OnDestroy {
         });
     }
 
-    openAdapterStatusErrorDialog(message: SpLogMessage, title: string) {
-        this.dialogService.open(SpExceptionDetailsDialogComponent, {
-            panelType: PanelType.STANDARD_PANEL,
-            title: 'Adapter Status',
-            width: '70vw',
-            data: {
-                message: message,
-                title: title,
+    openAdapterStatusErrorDialog(
+        adapter: AdapterDescription,
+        message: SpLogMessage,
+        startAction: boolean,
+    ) {
+        const title = startAction
+            ? this.startAdapterErrorText
+            : this.stopAdapterErrorText;
+        const dialogRef = this.dialogService.open(
+            SpExceptionDetailsDialogComponent,
+            {
+                panelType: PanelType.STANDARD_PANEL,
+                title: 'Adapter Status',
+                width: '70vw',
+                data: {
+                    message: message,
+                    title: title,
+                    additionalButton: !startAction,
+                    additionalButtonText: 'Reset adapter state',
+                },
             },
+        );
+        dialogRef.afterClosed().subscribe(forceStop => {
+            if (forceStop) {
+                this.stopAdapter(adapter, true);
+            }
         });
     }
 

Reply via email to