mcgilman commented on code in PR #11112:
URL: https://github.com/apache/nifi/pull/11112#discussion_r3047045589


##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/connectors-listing/connectors-listing.reducer.ts:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { createReducer, on } from '@ngrx/store';
+import { ConnectorsListingState } from '../index';
+import {
+    cancelConnectorDrain,
+    cancelConnectorDrainSuccess,
+    connectorsListingBannerApiError,
+    createConnector,
+    createConnectorSuccess,
+    deleteConnector,
+    deleteConnectorSuccess,
+    discardConnectorConfig,
+    discardConnectorConfigSuccess,
+    drainConnector,
+    drainConnectorSuccess,
+    loadConnectorsListing,
+    loadConnectorsListingSuccess,
+    renameConnector,
+    renameConnectorApiError,
+    renameConnectorSuccess,
+    resetConnectorsListingState,
+    startConnector,
+    startConnectorSuccess,
+    stopConnector,
+    stopConnectorSuccess
+} from './connectors-listing.actions';
+
+export const initialState: ConnectorsListingState = {
+    connectors: [],
+    saving: false,
+    loadedTimestamp: '',
+    status: 'pending'
+};
+
+export const connectorsListingReducer = createReducer(
+    initialState,
+
+    on(resetConnectorsListingState, () => ({
+        ...initialState
+    })),
+
+    on(loadConnectorsListing, (state) => ({
+        ...state,
+        status: 'loading' as const
+    })),
+
+    on(loadConnectorsListingSuccess, (state, { response }) => ({
+        ...state,
+        connectors: response.connectors,
+        loadedTimestamp: response.loadedTimestamp,
+        status: 'success' as const
+    })),
+
+    on(createConnector, (state) => ({
+        ...state,
+        saving: true
+    })),
+
+    on(createConnectorSuccess, (state, { response }) => ({
+        ...state,
+        connectors: [...state.connectors, response.connector],
+        saving: false
+    })),
+
+    on(deleteConnector, (state) => ({
+        ...state,
+        saving: true
+    })),
+
+    on(deleteConnectorSuccess, (state, { response }) => ({
+        ...state,
+        connectors: state.connectors.filter((c) => c.id !== 
response.connector.id),
+        saving: false
+    })),
+
+    on(startConnector, (state) => ({
+        ...state,
+        saving: true
+    })),
+
+    on(startConnectorSuccess, (state, { response }) => ({
+        ...state,
+        connectors: state.connectors.map((c) => (c.id === 
response.connector.id ? response.connector : c)),
+        saving: false
+    })),
+
+    on(stopConnector, (state) => ({
+        ...state,
+        saving: true
+    })),
+
+    on(stopConnectorSuccess, (state, { response }) => ({
+        ...state,
+        connectors: state.connectors.map((c) => (c.id === 
response.connector.id ? response.connector : c)),
+        saving: false
+    })),
+
+    on(renameConnector, (state) => ({
+        ...state,
+        saving: true
+    })),
+
+    on(renameConnectorSuccess, (state, { response }) => ({
+        ...state,
+        connectors: state.connectors.map((c) => (c.id === 
response.connector.id ? response.connector : c)),
+        saving: false
+    })),
+
+    on(renameConnectorApiError, (state) => ({
+        ...state,
+        saving: false
+    })),
+
+    on(connectorsListingBannerApiError, (state) => ({
+        ...state,
+        saving: false,
+        status: 'success' as const

Review Comment:
   This should be covered with the fix to my previous comment but this doesn't 
seem right.



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/connectors-listing/connectors-listing.effects.ts:
##########
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Actions, createEffect, ofType } from '@ngrx/effects';
+import { Store } from '@ngrx/store';
+import { MatDialog } from '@angular/material/dialog';
+import { Router } from '@angular/router';
+import { catchError, from, map, of, switchMap, take, tap } from 'rxjs';
+import { HttpErrorResponse } from '@angular/common/http';
+import { LARGE_DIALOG, MEDIUM_DIALOG, SMALL_DIALOG, YesNoDialog } from 
'@nifi/shared';
+import { NiFiState } from '../../../../state';
+import { ConnectorService } from '../../service/connector.service';
+import { ErrorHelper } from '../../../../service/error-helper.service';
+import { Client } from '../../../../service/client.service';
+import { CreateConnector } from 
'../../ui/create-connector/create-connector.component';
+import { RenameConnectorDialog } from 
'../../ui/rename-connector-dialog/rename-connector-dialog.component';
+import { selectSaving } from './connectors-listing.selectors';
+import { DocumentedType } from '../../../../state/shared';
+import * as ErrorActions from '../../../../state/error/error.actions';
+import { ErrorContextKey } from '../../../../state/error';
+import {
+    cancelConnectorDrain,
+    cancelConnectorDrainSuccess,
+    connectorsListingBannerApiError,
+    createConnector,
+    createConnectorSuccess,
+    deleteConnector,
+    deleteConnectorSuccess,
+    discardConnectorConfig,
+    discardConnectorConfigSuccess,
+    drainConnector,
+    drainConnectorSuccess,
+    loadConnectorsListing,
+    loadConnectorsListingSuccess,
+    navigateToConfigureConnector,
+    navigateToManageAccessPolicies,
+    navigateToViewConnector,
+    navigateToViewConnectorDetails,
+    openNewConnectorDialog,
+    openRenameConnectorDialog,
+    promptConnectorDeletion,
+    promptDiscardConnectorConfig,
+    promptDrainConnector,
+    renameConnector,
+    renameConnectorApiError,
+    renameConnectorSuccess,
+    selectConnector,
+    startConnector,
+    startConnectorSuccess,
+    stopConnector,
+    stopConnectorSuccess
+} from './connectors-listing.actions';
+import { RenameConnectorRequest } from '../index';
+import { BackNavigation } from '../../../../state/navigation';
+
+@Injectable()
+export class ConnectorsListingEffects {
+    private actions$ = inject(Actions);
+    private store = inject<Store<NiFiState>>(Store);
+    private connectorService = inject(ConnectorService);
+    private errorHelper = inject(ErrorHelper);
+    private client = inject(Client);
+    private dialog = inject(MatDialog);
+    private router = inject(Router);
+
+    loadConnectorsListing$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(loadConnectorsListing),
+            switchMap(() =>
+                from(this.connectorService.getConnectors()).pipe(
+                    map((response) =>
+                        loadConnectorsListingSuccess({
+                            response: {
+                                connectors: response.connectors || [],
+                                loadedTimestamp: response.currentTime
+                            }
+                        })
+                    ),
+                    catchError((errorResponse: HttpErrorResponse) =>
+                        of(
+                            connectorsListingBannerApiError({
+                                error: 
this.errorHelper.getErrorString(errorResponse)
+                            })

Review Comment:
   In other listings, we capture the correct `status` to set. With the 
connectors listings if the initial load fails the skeleton will be stuck 
because the `status` is explicitly set to `success` and the `loadedTimestamp` 
is never populated. For instance in the Counter Listing
   
   ```
                           of(
                               CounterListingActions.loadCountersError({
                                   errorResponse,
                                   loadedTimestamp,
                                   status: loadedTimestamp !== 
initialState.loadedTimestamp ? 'success' : 'pending'
                               })
                           )
   ```



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/purge-connector/purge-connector.effects.spec.ts:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { TestBed } from '@angular/core/testing';
+import { provideMockActions } from '@ngrx/effects/testing';
+import { Observable, of, throwError, Subject } from 'rxjs';
+import { PurgeConnectorEffects } from './purge-connector.effects';
+import { ConnectorService } from '../../service/connector.service';
+import { ErrorHelper } from '../../../../service/error-helper.service';
+import { MatDialog } from '@angular/material/dialog';
+import { provideMockStore, MockStore } from '@ngrx/store/testing';
+import { HttpErrorResponse } from '@angular/common/http';
+import { ConnectorAction, ConnectorActionName, ConnectorEntity } from 
'@nifi/shared';
+import * as ErrorActions from '../../../../state/error/error.actions';
+import { ErrorContextKey } from '../../../../state/error';
+import { selectPurgeConnectorId, selectPurgeDropRequestEntity } from 
'./purge-connector.selectors';
+import { loadConnectorsListing } from 
'../connectors-listing/connectors-listing.actions';
+import {
+    deletePurgeRequest,
+    pollPurgeConnector,
+    pollPurgeConnectorSuccess,
+    promptPurgeConnector,
+    purgeConnectorApiError,
+    showPurgeConnectorResults,
+    startPollingPurgeConnector,
+    stopPollingPurgeConnector,
+    submitPurgeConnector,
+    submitPurgeConnectorSuccess
+} from './purge-connector.actions';
+import { DropRequestEntity } from '../../../flow-designer/state/queue';
+import type { Mock } from 'vitest';
+
+describe('PurgeConnectorEffects', () => {
+    function createMockAction(name: ConnectorActionName, allowed = true, 
reasonNotAllowed?: string): ConnectorAction {
+        const action: ConnectorAction = { name, description: `${name} action`, 
allowed };
+        if (reasonNotAllowed !== undefined) {
+            action.reasonNotAllowed = reasonNotAllowed;
+        }
+        return action;
+    }
+
+    function createMockConnector(
+        options: {
+            id?: string;
+            name?: string;
+            availableActions?: ConnectorAction[];
+        } = {}
+    ): ConnectorEntity {
+        const defaultActions: ConnectorAction[] = options.availableActions ?? [
+            createMockAction('START', true),
+            createMockAction('STOP', true),
+            createMockAction('CONFIGURE', true),
+            createMockAction('DELETE', true)
+        ];
+
+        return {
+            id: options.id || 'connector-123',
+            uri: `http://localhost/nifi-api/connectors/${options.id || 
'connector-123'}`,
+            permissions: { canRead: true, canWrite: true },
+            revision: { version: 1, clientId: 'client-1' },
+            bulletins: [],
+            component: {
+                id: options.id || 'connector-123',
+                type: 'org.apache.nifi.connector.TestConnector',
+                bundle: { group: 'org.apache.nifi', artifact: 'nifi-test-nar', 
version: '1.0.0' },
+                name: options.name || 'Test Connector',
+                state: 'STOPPED',
+                managedProcessGroupId: 'pg-root-default',
+                availableActions: defaultActions
+            },
+            status: { runStatus: 'STOPPED' }
+        };
+    }
+
+    function createMockDropRequestEntity(
+        options: {
+            id?: string;
+            finished?: boolean;
+            percentCompleted?: number;
+            dropped?: string;
+            original?: string;
+            failureReason?: string;
+        } = {}
+    ): DropRequestEntity {
+        return {
+            dropRequest: {
+                id: options.id || 'purge-req-1',
+                uri: `/connectors/connector-123/purge-requests/${options.id || 
'purge-req-1'}`,
+                submissionTime: '2025-01-01T00:00:00.000Z',
+                lastUpdated: '2025-01-01T00:00:01.000Z',
+                percentCompleted: options.percentCompleted ?? 100,
+                finished: options.finished ?? true,
+                failureReason: options.failureReason || '',
+                currentCount: 0,
+                currentSize: 0,
+                current: '0 / 0 bytes',
+                originalCount: 10,
+                originalSize: 1024,
+                original: options.original || '10 / 1 KB',
+                droppedCount: 10,
+                droppedSize: 1024,
+                dropped: options.dropped || '10 / 1 KB',
+                state: 'Completed'
+            }
+        };
+    }
+
+    async function setup(
+        options: {
+            purgeConnectorId?: string | null;
+            purgeDropEntity?: DropRequestEntity | null;
+        } = {}
+    ) {
+        let actions$: Observable<any>;
+
+        const mockConnectorService = {
+            createPurgeRequest: vi.fn(),
+            getPurgeRequest: vi.fn(),
+            deletePurgeRequest: vi.fn()
+        };
+
+        const mockErrorHelper = {
+            getErrorString: vi.fn().mockReturnValue('Error message')
+        };
+
+        const mockDialog = {
+            open: vi.fn(),
+            closeAll: vi.fn()
+        };
+
+        await TestBed.configureTestingModule({
+            providers: [
+                PurgeConnectorEffects,
+                provideMockActions(() => actions$),
+                provideMockStore({
+                    initialState: {},
+                    selectors: [
+                        { selector: selectPurgeConnectorId, value: 
options.purgeConnectorId ?? null },
+                        { selector: selectPurgeDropRequestEntity, value: 
options.purgeDropEntity ?? null }
+                    ]
+                }),
+                { provide: ConnectorService, useValue: mockConnectorService },
+                { provide: ErrorHelper, useValue: mockErrorHelper },
+                { provide: MatDialog, useValue: mockDialog }
+            ]
+        }).compileComponents();
+
+        const effects = TestBed.inject(PurgeConnectorEffects);
+        const store = TestBed.inject(MockStore);
+
+        return {
+            effects,
+            store,
+            actions$: (action: any) => {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}`, {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: connector.component,
+            id: connector.id
+        });
+    }
+
+    deleteConnector(connector: ConnectorEntity): Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/libs/shared/src/types/index.ts:
##########
@@ -218,3 +218,63 @@ export function isDefinedAndNotNull<T>() {
             })
         );
 }
+
+export interface Bundle {

Review Comment:
   `Bundle` should already be defined.



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/connectors-listing/connectors-listing.effects.spec.ts:
##########
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { TestBed } from '@angular/core/testing';
+import { provideMockActions } from '@ngrx/effects/testing';
+import { Observable, of, throwError } from 'rxjs';
+import { ConnectorsListingEffects } from './connectors-listing.effects';
+import { ConnectorService } from '../../service/connector.service';
+import { ErrorHelper } from '../../../../service/error-helper.service';
+import { Client } from '../../../../service/client.service';
+import { MatDialog } from '@angular/material/dialog';
+import { Router } from '@angular/router';
+import { provideMockStore, MockStore } from '@ngrx/store/testing';
+import { HttpErrorResponse } from '@angular/common/http';
+import { CreateConnector } from 
'../../ui/create-connector/create-connector.component';
+import { ConnectorAction, ConnectorActionName, ConnectorEntity, YesNoDialog } 
from '@nifi/shared';
+import * as ErrorActions from '../../../../state/error/error.actions';
+import { ErrorContextKey } from '../../../../state/error';
+import { selectSaving } from './connectors-listing.selectors';
+import {
+    cancelConnectorDrain,
+    cancelConnectorDrainSuccess,
+    connectorsListingBannerApiError,
+    createConnector,
+    createConnectorSuccess,
+    deleteConnector,
+    deleteConnectorSuccess,
+    discardConnectorConfig,
+    discardConnectorConfigSuccess,
+    drainConnector,
+    drainConnectorSuccess,
+    loadConnectorsListing,
+    loadConnectorsListingSuccess,
+    openNewConnectorDialog,
+    promptConnectorDeletion,
+    promptDiscardConnectorConfig,
+    promptDrainConnector,
+    renameConnector,
+    renameConnectorApiError,
+    renameConnectorSuccess,
+    selectConnector,
+    startConnector,
+    startConnectorSuccess,
+    stopConnector,
+    stopConnectorSuccess
+} from './connectors-listing.actions';
+import type { Mock } from 'vitest';
+
+describe('ConnectorsListingEffects', () => {
+    function createMockAction(name: ConnectorActionName, allowed = true, 
reasonNotAllowed?: string): ConnectorAction {
+        const action: ConnectorAction = { name, description: `${name} action`, 
allowed };
+        if (reasonNotAllowed !== undefined) {
+            action.reasonNotAllowed = reasonNotAllowed;
+        }
+        return action;
+    }
+
+    function createMockConnector(
+        options: {
+            id?: string;
+            name?: string;
+            type?: string;
+            availableActions?: ConnectorAction[];
+        } = {}
+    ): ConnectorEntity {
+        const defaultActions: ConnectorAction[] = options.availableActions ?? [
+            createMockAction('START', true),
+            createMockAction('STOP', true),
+            createMockAction('CONFIGURE', true),
+            createMockAction('DELETE', true)
+        ];
+
+        return {
+            id: options.id || 'connector-123',
+            uri: `http://localhost/nifi-api/connectors/${options.id || 
'connector-123'}`,
+            permissions: { canRead: true, canWrite: true },
+            revision: { version: 1, clientId: 'client-1' },
+            bulletins: [],
+            component: {
+                id: options.id || 'connector-123',
+                type: options.type || 
'org.apache.nifi.connector.TestConnector',
+                bundle: {
+                    group: 'org.apache.nifi',
+                    artifact: 'nifi-test-nar',
+                    version: '1.0.0'
+                },
+                name: options.name || 'Test Connector',
+                state: 'STOPPED',
+                managedProcessGroupId: 'pg-root-default',
+                availableActions: defaultActions
+            },
+            status: { runStatus: 'STOPPED' }
+        };
+    }
+
+    function createMockDialogRef(data: any = {}) {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/ui/connector-table/connector-table.component.ts:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Component, EventEmitter, Input, Output, inject } from '@angular/core';
+import { MatButtonModule } from '@angular/material/button';
+import { MatTableDataSource, MatTableModule } from '@angular/material/table';
+import { MatSortModule, Sort } from '@angular/material/sort';
+import { MatMenuModule } from '@angular/material/menu';
+import { MatTooltipModule } from '@angular/material/tooltip';
+import {
+    ConnectorEntity,
+    ConnectorActionName,
+    NifiTooltipDirective,
+    NiFiCommon,
+    canReadConnector,
+    canModifyConnector,
+    canOperateConnector,
+    isConnectorActionAllowed,
+    getConnectorActionDisabledReason
+} from '@nifi/shared';
+import { ValidationErrorsTipInput } from '../../../../state/shared';
+import { FlowConfiguration } from '../../../../state/flow-configuration';
+import { CurrentUser } from '../../../../state/current-user';
+import { ValidationErrorsTip } from 
'../../../../ui/common/tooltips/validation-errors-tip/validation-errors-tip.component';
+
+@Component({
+    selector: 'connector-table',
+    standalone: true,
+    templateUrl: './connector-table.component.html',
+    imports: [MatButtonModule, MatTableModule, MatSortModule, MatMenuModule, 
MatTooltipModule, NifiTooltipDirective],
+    styleUrls: ['./connector-table.component.scss']
+})
+export class ConnectorTable {
+    private nifiCommon = inject(NiFiCommon);
+
+    protected readonly ValidationErrorsTip = ValidationErrorsTip;
+
+    @Input() initialSortColumn: 'name' | 'type' | 'bundle' | 'state' = 'name';
+    @Input() initialSortDirection: 'asc' | 'desc' = 'asc';
+    activeSort: Sort = {
+        active: this.initialSortColumn,
+        direction: this.initialSortDirection
+    };
+
+    @Input() set connectors(connectorEntities: ConnectorEntity[]) {
+        this.dataSource.data = this.sortEntities(connectorEntities, 
this.activeSort);
+    }
+
+    @Input() selectedConnectorId!: string;
+    @Input() flowConfiguration!: FlowConfiguration;
+    @Input() currentUser!: CurrentUser;
+
+    @Output() selectConnector = new EventEmitter<ConnectorEntity>();
+    @Output() viewConnector = new EventEmitter<ConnectorEntity>();
+    @Output() viewDetails = new EventEmitter<ConnectorEntity>();
+    @Output() configureConnector = new EventEmitter<ConnectorEntity>();
+    @Output() renameConnector = new EventEmitter<ConnectorEntity>();
+    @Output() startConnector = new EventEmitter<ConnectorEntity>();
+    @Output() stopConnector = new EventEmitter<ConnectorEntity>();
+    @Output() deleteConnector = new EventEmitter<ConnectorEntity>();
+    @Output() manageAccessPolicies = new EventEmitter<ConnectorEntity>();
+    @Output() viewDocumentation = new EventEmitter<ConnectorEntity>();
+    @Output() discardConnectorConfig = new EventEmitter<ConnectorEntity>();
+    @Output() drainConnector = new EventEmitter<ConnectorEntity>();
+    @Output() cancelDrainConnector = new EventEmitter<ConnectorEntity>();
+    @Output() purgeConnector = new EventEmitter<ConnectorEntity>();
+
+    displayedColumns: string[] = ['moreDetails', 'name', 'type', 'bundle', 
'state', 'actions'];
+    dataSource: MatTableDataSource<ConnectorEntity> = new 
MatTableDataSource<ConnectorEntity>();
+
+    canRead(entity: ConnectorEntity): boolean {
+        return canReadConnector(entity);
+    }
+
+    hasErrors(entity: ConnectorEntity): boolean {
+        return !this.nifiCommon.isEmpty(entity.component.validationErrors);
+    }
+
+    getValidationErrorsTipData(entity: ConnectorEntity): 
ValidationErrorsTipInput {
+        return {
+            isValidating: entity.component.validationStatus === 'VALIDATING',
+            validationErrors: entity.component.validationErrors || []
+        };
+    }
+
+    canModify(entity: ConnectorEntity): boolean {
+        return canModifyConnector(entity);
+    }
+
+    canOperate(entity: ConnectorEntity): boolean {
+        return canOperateConnector(entity);
+    }
+
+    isActionAllowed(entity: ConnectorEntity, actionName: ConnectorActionName): 
boolean {
+        return isConnectorActionAllowed(entity, actionName);
+    }
+
+    getActionDisabledReason(entity: ConnectorEntity, actionName: 
ConnectorActionName): string {
+        return getConnectorActionDisabledReason(entity, actionName);
+    }
+
+    canConfigure(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'CONFIGURE');
+    }
+
+    canDelete(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'DELETE');
+    }
+
+    canStart(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'START');
+    }
+
+    canStop(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'STOP');
+    }
+
+    canDiscardConfig(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 
'DISCARD_WORKING_CONFIGURATION');
+    }
+
+    canDrain(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'DRAIN_FLOWFILES');
+    }
+
+    canCancelDrain(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'CANCEL_DRAIN_FLOWFILES');
+    }
+
+    canPurge(entity: ConnectorEntity): boolean {
+        return isConnectorActionAllowed(entity, 'PURGE_FLOWFILES');
+    }
+
+    formatName(entity: ConnectorEntity): string {
+        return this.canRead(entity) ? entity.component.name : entity.id;
+    }
+
+    formatType(entity: ConnectorEntity): string {
+        return this.canRead(entity) ? 
this.nifiCommon.formatType(entity.component) : '';
+    }
+
+    formatBundle(entity: ConnectorEntity): string {
+        return this.canRead(entity) ? 
this.nifiCommon.formatBundle(entity.component.bundle) : '';
+    }
+
+    formatState(entity: ConnectorEntity): string {
+        if (!this.canRead(entity)) {
+            return '';
+        }
+        return entity.component.state || 'STOPPED';

Review Comment:
   Is this fallback needed?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}`, {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: connector.component,
+            id: connector.id
+        });
+    }
+
+    deleteConnector(connector: ConnectorEntity): Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}`, { 
params });
+    }
+
+    updateConnectorRunStatus(connector: ConnectorEntity, runStatus: string): 
Observable<any> {
+        const revision = this.client.getRevision(connector);
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}/run-status`,
 {
+            revision,
+            state: runStatus,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        });
+    }
+
+    discardConnectorWorkingConfiguration(connector: ConnectorEntity): 
Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}`, {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: connector.component,
+            id: connector.id
+        });
+    }
+
+    deleteConnector(connector: ConnectorEntity): Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}`, { 
params });
+    }
+
+    updateConnectorRunStatus(connector: ConnectorEntity, runStatus: string): 
Observable<any> {
+        const revision = this.client.getRevision(connector);
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}/run-status`,
 {
+            revision,
+            state: runStatus,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        });
+    }
+
+    discardConnectorWorkingConfiguration(connector: ConnectorEntity): 
Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}/working-configuration`,
 {
+            params
+        });
+    }
+
+    drainConnector(connector: ConnectorEntity): Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/libs/shared/src/types/index.ts:
##########
@@ -218,3 +218,63 @@ export function isDefinedAndNotNull<T>() {
             })
         );
 }
+
+export interface Bundle {
+    artifact: string;
+    group: string;
+    version: string;
+}
+
+export type ConnectorActionName =
+    | 'START'
+    | 'STOP'
+    | 'CONFIGURE'
+    | 'DISCARD_WORKING_CONFIGURATION'
+    | 'PURGE_FLOWFILES'
+    | 'DRAIN_FLOWFILES'
+    | 'CANCEL_DRAIN_FLOWFILES'
+    | 'APPLY_UPDATES'
+    | 'DELETE';
+
+export interface ConnectorAction {
+    name: ConnectorActionName;
+    description: string;
+    allowed: boolean;
+    reasonNotAllowed?: string;
+}
+
+export enum ConnectorState {
+    STARTING = 'STARTING',
+    RUNNING = 'RUNNING',
+    STOPPING = 'STOPPING',
+    STOPPED = 'STOPPED',
+    DRAINING = 'DRAINING',
+    DISABLED = 'DISABLED',
+    PREPARING_FOR_UPDATE = 'PREPARING_FOR_UPDATE',
+    UPDATING = 'UPDATING',
+    UPDATE_FAILED = 'UPDATE_FAILED'
+}
+
+export interface ConnectorComponent {
+    id: string;
+    name: string;
+    type: string;
+    state: string;
+    bundle: Bundle;
+    validationErrors?: string[];
+    validationStatus?: string;
+    multipleVersionsAvailable?: boolean;
+    availableActions: ConnectorAction[];
+    managedProcessGroupId: string;
+}
+
+export interface ConnectorEntity {
+    permissions: Permissions;
+    operatePermissions?: Permissions;
+    revision: Revision;
+    bulletins: BulletinEntity[];
+    id: string;
+    uri: string;
+    status: any;

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}`, {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: connector.component,
+            id: connector.id
+        });
+    }
+
+    deleteConnector(connector: ConnectorEntity): Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}`, { 
params });
+    }
+
+    updateConnectorRunStatus(connector: ConnectorEntity, runStatus: string): 
Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/service/connector.service.ts:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Observable } from 'rxjs';
+import { HttpClient } from '@angular/common/http';
+import { Client } from '../../../service/client.service';
+import { ClusterConnectionService } from 
'../../../service/cluster-connection.service';
+import { CreateConnectorRequest } from '../state';
+import { ConnectorEntity } from '@nifi/shared';
+import { DropRequestEntity } from '../../flow-designer/state/queue';
+
+@Injectable({ providedIn: 'root' })
+export class ConnectorService {
+    private httpClient = inject(HttpClient);
+    private client = inject(Client);
+    private clusterConnectionService = inject(ClusterConnectionService);
+
+    private static readonly API: string = '../nifi-api';
+
+    getConnectors(): Observable<any> {
+        return this.httpClient.get(`${ConnectorService.API}/flow/connectors`);
+    }
+
+    createConnector(createConnectorRequest: CreateConnectorRequest): 
Observable<any> {
+        return this.httpClient.post(`${ConnectorService.API}/connectors`, {
+            revision: createConnectorRequest.revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: {
+                bundle: createConnectorRequest.connectorBundle,
+                type: createConnectorRequest.connectorType
+            }
+        });
+    }
+
+    updateConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}`, {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged(),
+            component: connector.component,
+            id: connector.id
+        });
+    }
+
+    deleteConnector(connector: ConnectorEntity): Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}`, { 
params });
+    }
+
+    updateConnectorRunStatus(connector: ConnectorEntity, runStatus: string): 
Observable<any> {
+        const revision = this.client.getRevision(connector);
+        return 
this.httpClient.put(`${ConnectorService.API}/connectors/${connector.id}/run-status`,
 {
+            revision,
+            state: runStatus,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        });
+    }
+
+    discardConnectorWorkingConfiguration(connector: ConnectorEntity): 
Observable<any> {
+        const revision = this.client.getRevision(connector);
+        const params: any = {
+            ...revision,
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        };
+        return 
this.httpClient.delete(`${ConnectorService.API}/connectors/${connector.id}/working-configuration`,
 {
+            params
+        });
+    }
+
+    drainConnector(connector: ConnectorEntity): Observable<any> {
+        return 
this.httpClient.post(`${ConnectorService.API}/connectors/${connector.id}/drain`,
 {
+            revision: this.client.getRevision(connector),
+            disconnectedNodeAcknowledged: 
this.clusterConnectionService.isDisconnectionAcknowledged()
+        });
+    }
+
+    cancelConnectorDrain(connector: ConnectorEntity): Observable<any> {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/connectors-listing/connectors-listing.effects.spec.ts:
##########
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { TestBed } from '@angular/core/testing';
+import { provideMockActions } from '@ngrx/effects/testing';
+import { Observable, of, throwError } from 'rxjs';
+import { ConnectorsListingEffects } from './connectors-listing.effects';
+import { ConnectorService } from '../../service/connector.service';
+import { ErrorHelper } from '../../../../service/error-helper.service';
+import { Client } from '../../../../service/client.service';
+import { MatDialog } from '@angular/material/dialog';
+import { Router } from '@angular/router';
+import { provideMockStore, MockStore } from '@ngrx/store/testing';
+import { HttpErrorResponse } from '@angular/common/http';
+import { CreateConnector } from 
'../../ui/create-connector/create-connector.component';
+import { ConnectorAction, ConnectorActionName, ConnectorEntity, YesNoDialog } 
from '@nifi/shared';
+import * as ErrorActions from '../../../../state/error/error.actions';
+import { ErrorContextKey } from '../../../../state/error';
+import { selectSaving } from './connectors-listing.selectors';
+import {
+    cancelConnectorDrain,
+    cancelConnectorDrainSuccess,
+    connectorsListingBannerApiError,
+    createConnector,
+    createConnectorSuccess,
+    deleteConnector,
+    deleteConnectorSuccess,
+    discardConnectorConfig,
+    discardConnectorConfigSuccess,
+    drainConnector,
+    drainConnectorSuccess,
+    loadConnectorsListing,
+    loadConnectorsListingSuccess,
+    openNewConnectorDialog,
+    promptConnectorDeletion,
+    promptDiscardConnectorConfig,
+    promptDrainConnector,
+    renameConnector,
+    renameConnectorApiError,
+    renameConnectorSuccess,
+    selectConnector,
+    startConnector,
+    startConnectorSuccess,
+    stopConnector,
+    stopConnectorSuccess
+} from './connectors-listing.actions';
+import type { Mock } from 'vitest';
+
+describe('ConnectorsListingEffects', () => {
+    function createMockAction(name: ConnectorActionName, allowed = true, 
reasonNotAllowed?: string): ConnectorAction {
+        const action: ConnectorAction = { name, description: `${name} action`, 
allowed };
+        if (reasonNotAllowed !== undefined) {
+            action.reasonNotAllowed = reasonNotAllowed;
+        }
+        return action;
+    }
+
+    function createMockConnector(
+        options: {
+            id?: string;
+            name?: string;
+            type?: string;
+            availableActions?: ConnectorAction[];
+        } = {}
+    ): ConnectorEntity {
+        const defaultActions: ConnectorAction[] = options.availableActions ?? [
+            createMockAction('START', true),
+            createMockAction('STOP', true),
+            createMockAction('CONFIGURE', true),
+            createMockAction('DELETE', true)
+        ];
+
+        return {
+            id: options.id || 'connector-123',
+            uri: `http://localhost/nifi-api/connectors/${options.id || 
'connector-123'}`,
+            permissions: { canRead: true, canWrite: true },
+            revision: { version: 1, clientId: 'client-1' },
+            bulletins: [],
+            component: {
+                id: options.id || 'connector-123',
+                type: options.type || 
'org.apache.nifi.connector.TestConnector',
+                bundle: {
+                    group: 'org.apache.nifi',
+                    artifact: 'nifi-test-nar',
+                    version: '1.0.0'
+                },
+                name: options.name || 'Test Connector',
+                state: 'STOPPED',
+                managedProcessGroupId: 'pg-root-default',
+                availableActions: defaultActions
+            },
+            status: { runStatus: 'STOPPED' }
+        };
+    }
+
+    function createMockDialogRef(data: any = {}) {
+        return { componentInstance: data };
+    }
+
+    async function setup(
+        options: {
+            saving?: boolean;
+        } = {}
+    ) {
+        let actions$: Observable<any>;
+
+        const mockConnectorService = {
+            getConnectors: vi.fn(),
+            createConnector: vi.fn(),
+            deleteConnector: vi.fn(),
+            updateConnector: vi.fn(),
+            updateConnectorRunStatus: vi.fn(),
+            discardConnectorWorkingConfiguration: vi.fn(),
+            drainConnector: vi.fn(),
+            cancelConnectorDrain: vi.fn()
+        };
+
+        const mockErrorHelper = {
+            getErrorString: vi.fn().mockReturnValue('Error message')
+        };
+
+        const mockClient = {
+            getClientId: vi.fn().mockReturnValue('client-1')
+        };
+
+        const mockDialog = {
+            open: vi.fn(),
+            closeAll: vi.fn()
+        };
+
+        const mockRouter = {
+            navigate: vi.fn()
+        };
+
+        await TestBed.configureTestingModule({
+            providers: [
+                ConnectorsListingEffects,
+                provideMockActions(() => actions$),
+                provideMockStore({
+                    initialState: {},
+                    selectors: [{ selector: selectSaving, value: 
options.saving ?? false }]
+                }),
+                { provide: ConnectorService, useValue: mockConnectorService },
+                { provide: ErrorHelper, useValue: mockErrorHelper },
+                { provide: Client, useValue: mockClient },
+                { provide: MatDialog, useValue: mockDialog },
+                { provide: Router, useValue: mockRouter }
+            ]
+        }).compileComponents();
+
+        const effects = TestBed.inject(ConnectorsListingEffects);
+        const store = TestBed.inject(MockStore);
+
+        return {
+            effects,
+            store,
+            actions$: (action: any) => {

Review Comment:
   Can we be more specific with the type?



##########
nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/connectors/state/purge-connector/purge-connector.effects.ts:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable, inject } from '@angular/core';
+import { Actions, createEffect, ofType } from '@ngrx/effects';
+import { concatLatestFrom } from '@ngrx/operators';
+import { Store } from '@ngrx/store';
+import { MatDialog } from '@angular/material/dialog';
+import { asyncScheduler, catchError, filter, from, interval, map, of, 
switchMap, take, takeUntil, tap } from 'rxjs';
+import { HttpErrorResponse } from '@angular/common/http';
+import { isDefinedAndNotNull, SMALL_DIALOG, YesNoDialog } from '@nifi/shared';
+import { NiFiState } from '../../../../state';
+import { ConnectorService } from '../../service/connector.service';
+import { ErrorHelper } from '../../../../service/error-helper.service';
+import { CancelDialog } from 
'../../../../ui/common/cancel-dialog/cancel-dialog.component';
+import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
+import { DropRequest } from '../../../flow-designer/state/queue';
+import * as ErrorActions from '../../../../state/error/error.actions';
+import { ErrorContextKey } from '../../../../state/error';
+import { selectPurgeConnectorId, selectPurgeDropRequestEntity } from 
'./purge-connector.selectors';
+import { loadConnectorsListing } from 
'../connectors-listing/connectors-listing.actions';
+import {
+    deletePurgeRequest,
+    pollPurgeConnector,
+    pollPurgeConnectorSuccess,
+    promptPurgeConnector,
+    purgeConnectorApiError,
+    resetPurgeConnectorState,
+    showPurgeConnectorResults,
+    startPollingPurgeConnector,
+    stopPollingPurgeConnector,
+    submitPurgeConnector,
+    submitPurgeConnectorSuccess
+} from './purge-connector.actions';
+
+@Injectable()
+export class PurgeConnectorEffects {
+    private actions$ = inject(Actions);
+    private store = inject<Store<NiFiState>>(Store);
+    private connectorService = inject(ConnectorService);
+    private errorHelper = inject(ErrorHelper);
+    private dialog = inject(MatDialog);
+
+    promptPurgeConnector$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(promptPurgeConnector),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogRef = 
this.dialog.open<YesNoDialog>(YesNoDialog, {
+                        ...SMALL_DIALOG,
+                        data: {
+                            title: 'Purge Connector',
+                            message: `Are you sure you want to purge connector 
'${request.connector.component.name}'? All FlowFiles waiting at the time of the 
request will be removed.`
+                        }
+                    });
+
+                    
dialogRef.componentInstance!.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(submitPurgeConnector({ request }));
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitPurgeConnector$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(submitPurgeConnector),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Purge Connector',
+                        message: 'Waiting for connector to be purged...'
+                    },
+                    disableClose: true
+                });
+
+                
dialogReference.componentInstance.exit.pipe(take(1)).subscribe(() => {
+                    this.store.dispatch(stopPollingPurgeConnector());
+                });
+
+                return 
from(this.connectorService.createPurgeRequest(request.connector.id)).pipe(
+                    map((response) =>
+                        submitPurgeConnectorSuccess({
+                            response: { dropEntity: response }
+                        })
+                    ),
+                    catchError((errorResponse: HttpErrorResponse) =>
+                        of(
+                            purgeConnectorApiError({
+                                error: 
this.errorHelper.getErrorString(errorResponse)
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    submitPurgeConnectorSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(submitPurgeConnectorSuccess),
+            map((action) => action.response),
+            switchMap((response) => {
+                const dropRequest: DropRequest = 
response.dropEntity.dropRequest;
+                if (dropRequest.finished) {
+                    return of(deletePurgeRequest());
+                } else {
+                    return of(startPollingPurgeConnector());
+                }
+            })
+        )
+    );
+
+    startPollingPurgeConnector$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(startPollingPurgeConnector),
+            switchMap(() =>
+                interval(2000, 
asyncScheduler).pipe(takeUntil(this.actions$.pipe(ofType(stopPollingPurgeConnector))))
+            ),
+            switchMap(() => of(pollPurgeConnector()))
+        )
+    );
+
+    pollPurgeConnector$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(pollPurgeConnector),
+            concatLatestFrom(() => [
+                
this.store.select(selectPurgeDropRequestEntity).pipe(isDefinedAndNotNull()),
+                this.store.select(selectPurgeConnectorId)
+            ]),
+            switchMap(([, dropEntity, connectorId]) => {
+                if (!connectorId) {
+                    return of(purgeConnectorApiError({ error: 'Purge request 
state is missing' }));
+                }
+
+                return from(this.connectorService.getPurgeRequest(connectorId, 
dropEntity.dropRequest.id)).pipe(
+                    map((response) =>
+                        pollPurgeConnectorSuccess({
+                            response: { dropEntity: response }
+                        })
+                    ),
+                    catchError((errorResponse: HttpErrorResponse) =>
+                        of(
+                            purgeConnectorApiError({
+                                error: 
this.errorHelper.getErrorString(errorResponse)
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    pollPurgeConnectorSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(pollPurgeConnectorSuccess),
+            map((action) => action.response),
+            filter((response) => response.dropEntity.dropRequest.finished),
+            switchMap(() => of(stopPollingPurgeConnector()))
+        )
+    );
+
+    stopPollingPurgeConnector$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(stopPollingPurgeConnector),
+            switchMap(() => of(deletePurgeRequest()))
+        )
+    );
+
+    deletePurgeRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(deletePurgeRequest),
+            concatLatestFrom(() => [
+                
this.store.select(selectPurgeDropRequestEntity).pipe(isDefinedAndNotNull()),
+                this.store.select(selectPurgeConnectorId)
+            ]),
+            switchMap(([, dropEntity, connectorId]) => {
+                this.dialog.closeAll();
+
+                if (!connectorId) {
+                    return of(loadConnectorsListing());
+                }
+
+                return 
from(this.connectorService.deletePurgeRequest(connectorId, 
dropEntity.dropRequest.id)).pipe(
+                    map((response) =>
+                        showPurgeConnectorResults({
+                            request: { dropEntity: response }
+                        })
+                    ),
+                    catchError(() =>
+                        of(
+                            showPurgeConnectorResults({
+                                request: { dropEntity }
+                            })

Review Comment:
   Looks like the error is being silently swallowed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to