This is an automated email from the ASF dual-hosted git repository.

rfellows pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new 3f981da5b3 NIFI-15549: Adding endpoints for viewing and clearing state 
for compo… (#10855)
3f981da5b3 is described below

commit 3f981da5b378527c520a141440509501ed04cc31
Author: Matt Gilman <[email protected]>
AuthorDate: Thu Feb 5 12:05:53 2026 -0500

    NIFI-15549: Adding endpoints for viewing and clearing state for compo… 
(#10855)
    
    * NIFI-15549: Adding endpoints for viewing and clearing state for 
components that are managed by a Connector.
    
    * NIFI-15549: Addressing review feedback.
---
 .../endpoints/ComponentStateEndpointMerger.java    |   6 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  53 +++++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  69 ++++++
 .../org/apache/nifi/web/api/ConnectorResource.java | 240 ++++++++++++++++++++
 .../nifi/web/StandardNiFiServiceFacadeTest.java    | 250 +++++++++++++++++++++
 .../apache/nifi/web/api/TestConnectorResource.java | 116 ++++++++++
 .../nifi/toolkit/client/ConnectorClient.java       |  45 ++++
 .../toolkit/client/impl/JerseyConnectorClient.java |  77 +++++++
 8 files changed, 855 insertions(+), 1 deletion(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
index 60686c5369..02e06b93d8 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
@@ -38,6 +38,8 @@ public class ComponentStateEndpointMerger extends 
AbstractSingleDTOEndpoint<Comp
     public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state");
     public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state");
     public static final Pattern FLOW_ANALYSIS_RULE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/flow-analysis-rules/[a-f0-9\\-]{36}/state");
+    public static final Pattern CONNECTOR_PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/processors/[a-f0-9\\-]{36}/state");
+    public static final Pattern CONNECTOR_CONTROLLER_SERVICE_STATE_URI_PATTERN 
= 
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/controller-services/[a-f0-9\\-]{36}/state");
 
     @Override
     public boolean canHandle(URI uri, String method) {
@@ -48,7 +50,9 @@ public class ComponentStateEndpointMerger extends 
AbstractSingleDTOEndpoint<Comp
         return PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
             || 
CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
             || 
REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
-            || 
FLOW_ANALYSIS_RULE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+            || 
FLOW_ANALYSIS_RULE_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+            || 
CONNECTOR_PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+            || 
CONNECTOR_CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index c350b2b983..29788ca773 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -2101,6 +2101,59 @@ public interface NiFiServiceFacade {
      */
     ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
 
+    /**
+     * Gets the state for a processor within a connector's managed process 
group.
+     *
+     * @param connectorId the connector id
+     * @param processorId the processor id
+     * @return the component state
+     */
+    ComponentStateDTO getConnectorProcessorState(String connectorId, String 
processorId);
+
+    /**
+     * Verifies the processor state within a connector could be cleared.
+     *
+     * @param connectorId the connector id
+     * @param processorId the processor id
+     */
+    void verifyCanClearConnectorProcessorState(String connectorId, String 
processorId);
+
+    /**
+     * Clears the state for a processor within a connector's managed process 
group.
+     *
+     * @param connectorId       the connector id
+     * @param processorId       the processor id
+     * @param componentStateDTO the state of the processor
+     * @return the cleared component state
+     */
+    ComponentStateDTO clearConnectorProcessorState(String connectorId, String 
processorId, ComponentStateDTO componentStateDTO);
+
+    /**
+     * Gets the state for a controller service within a connector's managed 
process group.
+     *
+     * @param connectorId         the connector id
+     * @param controllerServiceId the controller service id
+     * @return the component state
+     */
+    ComponentStateDTO getConnectorControllerServiceState(String connectorId, 
String controllerServiceId);
+
+    /**
+     * Verifies the controller service state within a connector could be 
cleared.
+     *
+     * @param connectorId         the connector id
+     * @param controllerServiceId the controller service id
+     */
+    void verifyCanClearConnectorControllerServiceState(String connectorId, 
String controllerServiceId);
+
+    /**
+     * Clears the state for a controller service within a connector's managed 
process group.
+     *
+     * @param connectorId         the connector id
+     * @param controllerServiceId the controller service id
+     * @param componentStateDTO   the state of the controller service
+     * @return the cleared component state
+     */
+    ComponentStateDTO clearConnectorControllerServiceState(String connectorId, 
String controllerServiceId, ComponentStateDTO componentStateDTO);
 
     // ----------------------------------------
     // Label methods
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 654644d552..1bfe0de934 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -415,6 +415,7 @@ import org.apache.nifi.web.api.request.FlowMetricsRegistry;
 import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
 import org.apache.nifi.web.dao.ConnectorDAO;
 import org.apache.nifi.web.dao.ControllerServiceDAO;
@@ -497,6 +498,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     private BulletinRepository bulletinRepository;
 
     // data access objects
+    private ComponentStateDAO componentStateDAO;
     private ProcessorDAO processorDAO;
     private ProcessGroupDAO processGroupDAO;
     private RemoteProcessGroupDAO remoteProcessGroupDAO;
@@ -2099,6 +2101,68 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         return dtoFactory.createComponentStateDTO(remoteProcessGroupId, 
remoteProcessGroup.getClass(), localState, clusterState);
     }
 
+    @Override
+    public ComponentStateDTO getConnectorProcessorState(final String 
connectorId, final String processorId) {
+        final ProcessorNode processor = locateConnectorProcessor(connectorId, 
processorId);
+        final StateMap clusterState = isClustered() ? 
componentStateDAO.getState(processor, Scope.CLUSTER) : null;
+        final StateMap localState = componentStateDAO.getState(processor, 
Scope.LOCAL);
+        return dtoFactory.createComponentStateDTO(processorId, 
processor.getProcessor().getClass(), localState, clusterState);
+    }
+
+    @Override
+    public void verifyCanClearConnectorProcessorState(final String 
connectorId, final String processorId) {
+        final ProcessorNode processor = locateConnectorProcessor(connectorId, 
processorId);
+        processor.verifyCanClearState();
+    }
+
+    @Override
+    public ComponentStateDTO clearConnectorProcessorState(final String 
connectorId, final String processorId, final ComponentStateDTO 
componentStateDTO) {
+        final ProcessorNode processor = locateConnectorProcessor(connectorId, 
processorId);
+        componentStateDAO.clearState(processor, componentStateDTO);
+        return getConnectorProcessorState(connectorId, processorId);
+    }
+
+    @Override
+    public ComponentStateDTO getConnectorControllerServiceState(final String 
connectorId, final String controllerServiceId) {
+        final ControllerServiceNode controllerService = 
locateConnectorControllerService(connectorId, controllerServiceId);
+        final StateMap clusterState = isClustered() ? 
componentStateDAO.getState(controllerService, Scope.CLUSTER) : null;
+        final StateMap localState = 
componentStateDAO.getState(controllerService, Scope.LOCAL);
+        return dtoFactory.createComponentStateDTO(controllerServiceId, 
controllerService.getControllerServiceImplementation().getClass(), localState, 
clusterState);
+    }
+
+    @Override
+    public void verifyCanClearConnectorControllerServiceState(final String 
connectorId, final String controllerServiceId) {
+        final ControllerServiceNode controllerService = 
locateConnectorControllerService(connectorId, controllerServiceId);
+        controllerService.verifyCanClearState();
+    }
+
+    @Override
+    public ComponentStateDTO clearConnectorControllerServiceState(final String 
connectorId, final String controllerServiceId, final ComponentStateDTO 
componentStateDTO) {
+        final ControllerServiceNode controllerService = 
locateConnectorControllerService(connectorId, controllerServiceId);
+        componentStateDAO.clearState(controllerService, componentStateDTO);
+        return getConnectorControllerServiceState(connectorId, 
controllerServiceId);
+    }
+
+    private ProcessorNode locateConnectorProcessor(final String connectorId, 
final String processorId) {
+        final ConnectorNode connectorNode = 
connectorDAO.getConnector(connectorId);
+        final ProcessGroup managedGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+        final ProcessorNode processor = 
managedGroup.findProcessor(processorId);
+        if (processor == null) {
+            throw new ResourceNotFoundException("Unable to find processor with 
id '%s' within connector '%s'.".formatted(processorId, connectorId));
+        }
+        return processor;
+    }
+
+    private ControllerServiceNode locateConnectorControllerService(final 
String connectorId, final String controllerServiceId) {
+        final ConnectorNode connectorNode = 
connectorDAO.getConnector(connectorId);
+        final ProcessGroup managedGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+        final ControllerServiceNode controllerService = 
managedGroup.findControllerService(controllerServiceId, false, true);
+        if (controllerService == null) {
+            throw new ResourceNotFoundException("Unable to find controller 
service with id '%s' within connector '%s'.".formatted(controllerServiceId, 
connectorId));
+        }
+        return controllerService;
+    }
+
     @Override
     public ConnectionEntity deleteConnection(final Revision revision, final 
String connectionId) {
         final Connection connection = 
connectionDAO.getConnection(connectionId);
@@ -7800,6 +7864,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         this.parameterProviderDAO = parameterProviderDAO;
     }
 
+    @Autowired
+    public void setComponentStateDAO(final ComponentStateDAO 
componentStateDAO) {
+        this.componentStateDAO = componentStateDAO;
+    }
+
     @Autowired
     public void setParameterContextDAO(final ParameterContextDAO 
parameterContextDAO) {
         this.parameterContextDAO = parameterContextDAO;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 00a44a0575..1f4b0a9932 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -74,6 +74,7 @@ import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
 import org.apache.nifi.web.api.concurrent.UpdateStep;
 import org.apache.nifi.web.api.dto.AssetDTO;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
 import org.apache.nifi.web.api.dto.ConnectorDTO;
@@ -82,6 +83,7 @@ import 
org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.entity.AssetEntity;
 import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -2146,6 +2148,244 @@ public class ConnectorResource extends 
ApplicationResource {
             .build();
     }
 
+    // -----------------
+    // Processor State
+    // -----------------
+
+    /**
+     * Gets the state for a processor within a connector.
+     *
+     * @param connectorId the connector id
+     * @param processorId the processor id
+     * @return a ComponentStateEntity
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/processors/{processorId}/state")
+    @Operation(
+            summary = "Gets the state for a processor within a connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response getConnectorProcessorState(
+            @Parameter(description = "The connector id.", required = true)
+            @PathParam("id") final String connectorId,
+            @Parameter(description = "The processor id.", required = true)
+            @PathParam("processorId") final String processorId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connector = lookup.getConnector(connectorId);
+            connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+        });
+
+        final ComponentStateDTO state = 
serviceFacade.getConnectorProcessorState(connectorId, processorId);
+
+        final ComponentStateEntity entity = new ComponentStateEntity();
+        entity.setComponentState(state);
+
+        return generateOkResponse(entity).build();
+    }
+
+    /**
+     * Clears the state for a processor within a connector.
+     *
+     * @param connectorId the connector id
+     * @param processorId the processor id
+     * @return a ComponentStateEntity
+     */
+    @POST
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/processors/{processorId}/state/clear-requests")
+    @Operation(
+            summary = "Clears the state for a processor within a connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response clearConnectorProcessorState(
+            @Parameter(description = "The connector id.", required = true)
+            @PathParam("id") final String connectorId,
+            @Parameter(description = "The processor id.", required = true)
+            @PathParam("processorId") final String processorId,
+            @Parameter(description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.", required = false)
+            final ComponentStateEntity componentStateEntity) {
+
+        if (isReplicateRequest()) {
+            if (componentStateEntity == null) {
+                return replicate(HttpMethod.POST);
+            } else {
+                return replicate(HttpMethod.POST, componentStateEntity);
+            }
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(connectorId);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = 
lookup.getConnector(connectorId);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> 
serviceFacade.verifyCanClearConnectorProcessorState(connectorId, processorId),
+                (connectorEntity) -> {
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearConnectorProcessorState(connectorEntity.getId(), 
processorId, expectedState);
+
+                    // generate the response entity
+                    final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
+
+                    // generate the response
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
+    // -----------------
+    // Controller Service State
+    // -----------------
+
+    /**
+     * Gets the state for a controller service within a connector.
+     *
+     * @param connectorId         the connector id
+     * @param controllerServiceId the controller service id
+     * @return a ComponentStateEntity
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/controller-services/{controllerServiceId}/state")
+    @Operation(
+            summary = "Gets the state for a controller service within a 
connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response getConnectorControllerServiceState(
+            @Parameter(description = "The connector id.", required = true)
+            @PathParam("id") final String connectorId,
+            @Parameter(description = "The controller service id.", required = 
true)
+            @PathParam("controllerServiceId") final String 
controllerServiceId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connector = lookup.getConnector(connectorId);
+            connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+        });
+
+        final ComponentStateDTO state = 
serviceFacade.getConnectorControllerServiceState(connectorId, 
controllerServiceId);
+
+        final ComponentStateEntity entity = new ComponentStateEntity();
+        entity.setComponentState(state);
+
+        return generateOkResponse(entity).build();
+    }
+
+    /**
+     * Clears the state for a controller service within a connector.
+     *
+     * @param connectorId         the connector id
+     * @param controllerServiceId the controller service id
+     * @return a ComponentStateEntity
+     */
+    @POST
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
+    @Produces(MediaType.APPLICATION_JSON)
+    
@Path("{id}/controller-services/{controllerServiceId}/state/clear-requests")
+    @Operation(
+            summary = "Clears the state for a controller service within a 
connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response clearConnectorControllerServiceState(
+            @Parameter(description = "The connector id.", required = true)
+            @PathParam("id") final String connectorId,
+            @Parameter(description = "The controller service id.", required = 
true)
+            @PathParam("controllerServiceId") final String controllerServiceId,
+            @Parameter(description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.", required = false)
+            final ComponentStateEntity componentStateEntity) {
+
+        if (isReplicateRequest()) {
+            if (componentStateEntity == null) {
+                return replicate(HttpMethod.POST);
+            } else {
+                return replicate(HttpMethod.POST, componentStateEntity);
+            }
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(connectorId);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = 
lookup.getConnector(connectorId);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> 
serviceFacade.verifyCanClearConnectorControllerServiceState(connectorId, 
controllerServiceId),
+                (connectorEntity) -> {
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearConnectorControllerServiceState(connectorEntity.getId(), 
controllerServiceId, expectedState);
+
+                    // generate the response entity
+                    final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
+
+                    // generate the response
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
     // -----------------
     // setters
     // -----------------
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 1779e797f2..a754520732 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -40,6 +40,9 @@ import org.apache.nifi.components.connector.ConnectorNode;
 import org.apache.nifi.components.connector.FrameworkFlowContext;
 import org.apache.nifi.components.connector.Secret;
 import org.apache.nifi.components.connector.secrets.AuthorizableSecret;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
@@ -67,6 +70,7 @@ import org.apache.nifi.groups.VersionedComponentAdditions;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.registry.flow.FlowRegistryUtil;
 import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -89,6 +93,7 @@ import org.apache.nifi.util.MockBulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.validation.RuleViolation;
 import org.apache.nifi.validation.RuleViolationsManager;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersDTO;
 import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
@@ -112,6 +117,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.TenantsEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
+import org.apache.nifi.web.dao.ComponentStateDAO;
 import org.apache.nifi.web.dao.ConnectorDAO;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@@ -1779,4 +1785,248 @@ public class StandardNiFiServiceFacadeTest {
         assertNotNull(result.getSecrets());
         assertTrue(result.getSecrets().isEmpty());
     }
+
+    // -----------------
+    // Connector State Tests
+    // -----------------
+
+    @Test
+    public void testGetConnectorProcessorState() {
+        final String connectorId = "connector-id";
+        final String processorId = "processor-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        final ComponentStateDAO componentStateDAO = 
mock(ComponentStateDAO.class);
+        final DtoFactory dtoFactory = mock(DtoFactory.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+        serviceFacade.setComponentStateDAO(componentStateDAO);
+        serviceFacade.setDtoFactory(dtoFactory);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ProcessorNode processorNode = mock(ProcessorNode.class);
+        final Processor processor = mock(Processor.class);
+        final StateMap localStateMap = mock(StateMap.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+        when(processorNode.getProcessor()).thenReturn(processor);
+        when(componentStateDAO.getState(processorNode, 
Scope.LOCAL)).thenReturn(localStateMap);
+
+        final ComponentStateDTO expectedDto = new ComponentStateDTO();
+        expectedDto.setComponentId(processorId);
+        when(dtoFactory.createComponentStateDTO(eq(processorId), 
eq(processor.getClass()), eq(localStateMap), any())).thenReturn(expectedDto);
+
+        final ComponentStateDTO result = 
serviceFacade.getConnectorProcessorState(connectorId, processorId);
+
+        assertNotNull(result);
+        assertEquals(processorId, result.getComponentId());
+        verify(connectorDAO).getConnector(connectorId);
+        verify(managedProcessGroup).findProcessor(processorId);
+        verify(componentStateDAO).getState(processorNode, Scope.LOCAL);
+    }
+
+    @Test
+    public void testGetConnectorProcessorStateNotFound() {
+        final String connectorId = "connector-id";
+        final String processorId = "non-existent-processor-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        when(managedProcessGroup.findProcessor(processorId)).thenReturn(null);
+
+        assertThrows(ResourceNotFoundException.class, () -> 
serviceFacade.getConnectorProcessorState(connectorId, processorId));
+    }
+
+    @Test
+    public void testVerifyCanClearConnectorProcessorState() {
+        final String connectorId = "connector-id";
+        final String processorId = "processor-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ProcessorNode processorNode = mock(ProcessorNode.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+
+        // Should not throw
+        serviceFacade.verifyCanClearConnectorProcessorState(connectorId, 
processorId);
+
+        verify(processorNode).verifyCanClearState();
+    }
+
+    @Test
+    public void testClearConnectorProcessorState() {
+        final String connectorId = "connector-id";
+        final String processorId = "processor-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        final ComponentStateDAO componentStateDAO = 
mock(ComponentStateDAO.class);
+        final DtoFactory dtoFactory = mock(DtoFactory.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+        serviceFacade.setComponentStateDAO(componentStateDAO);
+        serviceFacade.setDtoFactory(dtoFactory);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ProcessorNode processorNode = mock(ProcessorNode.class);
+        final Processor processor = mock(Processor.class);
+        final StateMap localStateMap = mock(StateMap.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+        when(processorNode.getProcessor()).thenReturn(processor);
+        when(componentStateDAO.getState(processorNode, 
Scope.LOCAL)).thenReturn(localStateMap);
+
+        final ComponentStateDTO expectedDto = new ComponentStateDTO();
+        expectedDto.setComponentId(processorId);
+        when(dtoFactory.createComponentStateDTO(eq(processorId), 
eq(processor.getClass()), eq(localStateMap), any())).thenReturn(expectedDto);
+
+        final ComponentStateDTO result = 
serviceFacade.clearConnectorProcessorState(connectorId, processorId, null);
+
+        assertNotNull(result);
+        assertEquals(processorId, result.getComponentId());
+        verify(componentStateDAO).clearState(processorNode, null);
+    }
+
+    @Test
+    public void testGetConnectorControllerServiceState() {
+        final String connectorId = "connector-id";
+        final String controllerServiceId = "controller-service-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        final ComponentStateDAO componentStateDAO = 
mock(ComponentStateDAO.class);
+        final DtoFactory dtoFactory = mock(DtoFactory.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+        serviceFacade.setComponentStateDAO(componentStateDAO);
+        serviceFacade.setDtoFactory(dtoFactory);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ControllerServiceNode controllerServiceNode = 
mock(ControllerServiceNode.class);
+        final ControllerService controllerService = 
mock(ControllerService.class);
+        final StateMap localStateMap = mock(StateMap.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        when(managedProcessGroup.findControllerService(controllerServiceId, 
false, true)).thenReturn(controllerServiceNode);
+        
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(controllerService);
+        when(componentStateDAO.getState(controllerServiceNode, 
Scope.LOCAL)).thenReturn(localStateMap);
+
+        final ComponentStateDTO expectedDto = new ComponentStateDTO();
+        expectedDto.setComponentId(controllerServiceId);
+        when(dtoFactory.createComponentStateDTO(eq(controllerServiceId), 
eq(controllerService.getClass()), eq(localStateMap), 
any())).thenReturn(expectedDto);
+
+        final ComponentStateDTO result = 
serviceFacade.getConnectorControllerServiceState(connectorId, 
controllerServiceId);
+
+        assertNotNull(result);
+        assertEquals(controllerServiceId, result.getComponentId());
+        verify(connectorDAO).getConnector(connectorId);
+        verify(managedProcessGroup).findControllerService(controllerServiceId, 
false, true);
+        verify(componentStateDAO).getState(controllerServiceNode, Scope.LOCAL);
+    }
+
+    @Test
+    public void testGetConnectorControllerServiceStateNotFound() {
+        final String connectorId = "connector-id";
+        final String controllerServiceId = 
"non-existent-controller-service-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        when(managedProcessGroup.findControllerService(controllerServiceId, 
false, true)).thenReturn(null);
+
+        assertThrows(ResourceNotFoundException.class, () -> 
serviceFacade.getConnectorControllerServiceState(connectorId, 
controllerServiceId));
+    }
+
+    @Test
+    public void testVerifyCanClearConnectorControllerServiceState() {
+        final String connectorId = "connector-id";
+        final String controllerServiceId = "controller-service-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ControllerServiceNode controllerServiceNode = 
mock(ControllerServiceNode.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        when(managedProcessGroup.findControllerService(controllerServiceId, 
false, true)).thenReturn(controllerServiceNode);
+
+        // Should not throw
+        
serviceFacade.verifyCanClearConnectorControllerServiceState(connectorId, 
controllerServiceId);
+
+        verify(controllerServiceNode).verifyCanClearState();
+    }
+
+    @Test
+    public void testClearConnectorControllerServiceState() {
+        final String connectorId = "connector-id";
+        final String controllerServiceId = "controller-service-id";
+
+        final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+        final ComponentStateDAO componentStateDAO = 
mock(ComponentStateDAO.class);
+        final DtoFactory dtoFactory = mock(DtoFactory.class);
+        serviceFacade.setConnectorDAO(connectorDAO);
+        serviceFacade.setComponentStateDAO(componentStateDAO);
+        serviceFacade.setDtoFactory(dtoFactory);
+
+        final ConnectorNode connectorNode = mock(ConnectorNode.class);
+        final FrameworkFlowContext flowContext = 
mock(FrameworkFlowContext.class);
+        final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+        final ControllerServiceNode controllerServiceNode = 
mock(ControllerServiceNode.class);
+        final ControllerService controllerService = 
mock(ControllerService.class);
+        final StateMap localStateMap = mock(StateMap.class);
+
+        when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+        when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+        
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+        when(managedProcessGroup.findControllerService(controllerServiceId, 
false, true)).thenReturn(controllerServiceNode);
+        
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(controllerService);
+        when(componentStateDAO.getState(controllerServiceNode, 
Scope.LOCAL)).thenReturn(localStateMap);
+
+        final ComponentStateDTO expectedDto = new ComponentStateDTO();
+        expectedDto.setComponentId(controllerServiceId);
+        when(dtoFactory.createComponentStateDTO(eq(controllerServiceId), 
eq(controllerService.getClass()), eq(localStateMap), 
any())).thenReturn(expectedDto);
+
+        final ComponentStateDTO result = 
serviceFacade.clearConnectorControllerServiceState(connectorId, 
controllerServiceId, null);
+
+        assertNotNull(result);
+        assertEquals(controllerServiceId, result.getComponentId());
+        verify(componentStateDAO).clearState(controllerServiceNode, null);
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
index 9ec1afb42c..5d50471cf9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
@@ -29,10 +29,12 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.AllowableValueDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConnectorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
 import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
 import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
@@ -104,6 +106,8 @@ public class TestConnectorResource {
     private static final String PROPERTY_GROUP_NAME = "test-group";
     private static final String PROPERTY_NAME = "test-property";
     private static final String PROCESS_GROUP_ID = "test-process-group-id";
+    private static final String PROCESSOR_ID = "test-processor-id";
+    private static final String CONTROLLER_SERVICE_ID = 
"test-controller-service-id";
 
     @BeforeEach
     public void setUp() throws Exception {
@@ -629,4 +633,116 @@ public class TestConnectorResource {
 
         verify(serviceFacade, never()).createConnector(any(Revision.class), 
any(ConnectorDTO.class));
     }
+
+    @Test
+    public void testGetConnectorProcessorState() {
+        final ComponentStateDTO stateDTO = new ComponentStateDTO();
+        stateDTO.setComponentId(PROCESSOR_ID);
+        when(serviceFacade.getConnectorProcessorState(CONNECTOR_ID, 
PROCESSOR_ID)).thenReturn(stateDTO);
+
+        try (Response response = 
connectorResource.getConnectorProcessorState(CONNECTOR_ID, PROCESSOR_ID)) {
+            assertEquals(200, response.getStatus());
+            final ComponentStateEntity entity = (ComponentStateEntity) 
response.getEntity();
+            assertEquals(PROCESSOR_ID, 
entity.getComponentState().getComponentId());
+        }
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade).getConnectorProcessorState(CONNECTOR_ID, 
PROCESSOR_ID);
+    }
+
+    @Test
+    public void testGetConnectorProcessorStateNotAuthorized() {
+        
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+        assertThrows(AccessDeniedException.class, () ->
+            connectorResource.getConnectorProcessorState(CONNECTOR_ID, 
PROCESSOR_ID));
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade, never()).getConnectorProcessorState(anyString(), 
anyString());
+    }
+
+    @Test
+    public void testClearConnectorProcessorState() {
+        final ComponentStateDTO stateDTO = new ComponentStateDTO();
+        stateDTO.setComponentId(PROCESSOR_ID);
+        when(serviceFacade.clearConnectorProcessorState(eq(CONNECTOR_ID), 
eq(PROCESSOR_ID), any())).thenReturn(stateDTO);
+
+        try (Response response = 
connectorResource.clearConnectorProcessorState(CONNECTOR_ID, PROCESSOR_ID, 
null)) {
+            assertEquals(200, response.getStatus());
+            final ComponentStateEntity entity = (ComponentStateEntity) 
response.getEntity();
+            assertEquals(PROCESSOR_ID, 
entity.getComponentState().getComponentId());
+        }
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        
verify(serviceFacade).verifyCanClearConnectorProcessorState(CONNECTOR_ID, 
PROCESSOR_ID);
+        verify(serviceFacade).clearConnectorProcessorState(eq(CONNECTOR_ID), 
eq(PROCESSOR_ID), any());
+    }
+
+    @Test
+    public void testClearConnectorProcessorStateNotAuthorized() {
+        
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+        assertThrows(AccessDeniedException.class, () ->
+            connectorResource.clearConnectorProcessorState(CONNECTOR_ID, 
PROCESSOR_ID, null));
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade, 
never()).verifyCanClearConnectorProcessorState(anyString(), anyString());
+        verify(serviceFacade, 
never()).clearConnectorProcessorState(anyString(), anyString(), any());
+    }
+
+    @Test
+    public void testGetConnectorControllerServiceState() {
+        final ComponentStateDTO stateDTO = new ComponentStateDTO();
+        stateDTO.setComponentId(CONTROLLER_SERVICE_ID);
+        when(serviceFacade.getConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID)).thenReturn(stateDTO);
+
+        try (Response response = 
connectorResource.getConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID)) {
+            assertEquals(200, response.getStatus());
+            final ComponentStateEntity entity = (ComponentStateEntity) 
response.getEntity();
+            assertEquals(CONTROLLER_SERVICE_ID, 
entity.getComponentState().getComponentId());
+        }
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade).getConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID);
+    }
+
+    @Test
+    public void testGetConnectorControllerServiceStateNotAuthorized() {
+        
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+        assertThrows(AccessDeniedException.class, () ->
+            connectorResource.getConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID));
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade, 
never()).getConnectorControllerServiceState(anyString(), anyString());
+    }
+
+    @Test
+    public void testClearConnectorControllerServiceState() {
+        final ComponentStateDTO stateDTO = new ComponentStateDTO();
+        stateDTO.setComponentId(CONTROLLER_SERVICE_ID);
+        
when(serviceFacade.clearConnectorControllerServiceState(eq(CONNECTOR_ID), 
eq(CONTROLLER_SERVICE_ID), any())).thenReturn(stateDTO);
+
+        try (Response response = 
connectorResource.clearConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID, null)) {
+            assertEquals(200, response.getStatus());
+            final ComponentStateEntity entity = (ComponentStateEntity) 
response.getEntity();
+            assertEquals(CONTROLLER_SERVICE_ID, 
entity.getComponentState().getComponentId());
+        }
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        
verify(serviceFacade).verifyCanClearConnectorControllerServiceState(CONNECTOR_ID,
 CONTROLLER_SERVICE_ID);
+        
verify(serviceFacade).clearConnectorControllerServiceState(eq(CONNECTOR_ID), 
eq(CONTROLLER_SERVICE_ID), any());
+    }
+
+    @Test
+    public void testClearConnectorControllerServiceStateNotAuthorized() {
+        
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+        assertThrows(AccessDeniedException.class, () ->
+            
connectorResource.clearConnectorControllerServiceState(CONNECTOR_ID, 
CONTROLLER_SERVICE_ID, null));
+
+        verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+        verify(serviceFacade, 
never()).verifyCanClearConnectorControllerServiceState(anyString(), 
anyString());
+        verify(serviceFacade, 
never()).clearConnectorControllerServiceState(anyString(), anyString(), any());
+    }
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index d411ece527..05f9ed173c 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.client;
 
 import org.apache.nifi.web.api.entity.AssetEntity;
 import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -366,6 +367,50 @@ public interface ConnectorClient {
      */
     DropRequestEntity deletePurgeRequest(String connectorId, String 
purgeRequestId) throws NiFiClientException, IOException;
 
+    /**
+     * Gets the state for a processor within a connector.
+     *
+     * @param connectorId the connector ID
+     * @param processorId the processor ID
+     * @return the component state entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ComponentStateEntity getProcessorState(String connectorId, String 
processorId) throws NiFiClientException, IOException;
+
+    /**
+     * Clears the state for a processor within a connector.
+     *
+     * @param connectorId the connector ID
+     * @param processorId the processor ID
+     * @return the component state entity after clearing
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ComponentStateEntity clearProcessorState(String connectorId, String 
processorId) throws NiFiClientException, IOException;
+
+    /**
+     * Gets the state for a controller service within a connector.
+     *
+     * @param connectorId the connector ID
+     * @param controllerServiceId the controller service ID
+     * @return the component state entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ComponentStateEntity getControllerServiceState(String connectorId, String 
controllerServiceId) throws NiFiClientException, IOException;
+
+    /**
+     * Clears the state for a controller service within a connector.
+     *
+     * @param connectorId the connector ID
+     * @param controllerServiceId the controller service ID
+     * @return the component state entity after clearing
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ComponentStateEntity clearControllerServiceState(String connectorId, 
String controllerServiceId) throws NiFiClientException, IOException;
+
     /**
      * Indicates that mutable requests should indicate that the client has
      * acknowledged that the node is disconnected.
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index 06476292da..7b648e7183 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -27,6 +27,7 @@ import org.apache.nifi.toolkit.client.RequestConfig;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.entity.AssetEntity;
 import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -593,4 +594,80 @@ public class JerseyConnectorClient extends 
AbstractJerseyClient implements Conne
             return getRequestBuilder(target).delete(DropRequestEntity.class);
         });
     }
+
+    @Override
+    public ComponentStateEntity getProcessorState(final String connectorId, 
final String processorId) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(processorId)) {
+            throw new IllegalArgumentException("Processor id cannot be null or 
blank");
+        }
+
+        return executeAction("Error retrieving processor state for Connector " 
+ connectorId, () -> {
+            final WebTarget target = connectorTarget
+                .path("/processors/{processorId}/state")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("processorId", processorId);
+
+            return getRequestBuilder(target).get(ComponentStateEntity.class);
+        });
+    }
+
+    @Override
+    public ComponentStateEntity clearProcessorState(final String connectorId, 
final String processorId) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(processorId)) {
+            throw new IllegalArgumentException("Processor id cannot be null or 
blank");
+        }
+
+        return executeAction("Error clearing processor state for Connector " + 
connectorId, () -> {
+            final WebTarget target = connectorTarget
+                .path("/processors/{processorId}/state/clear-requests")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("processorId", processorId);
+
+            return getRequestBuilder(target).post(null, 
ComponentStateEntity.class);
+        });
+    }
+
+    @Override
+    public ComponentStateEntity getControllerServiceState(final String 
connectorId, final String controllerServiceId) throws NiFiClientException, 
IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(controllerServiceId)) {
+            throw new IllegalArgumentException("Controller service id cannot 
be null or blank");
+        }
+
+        return executeAction("Error retrieving controller service state for 
Connector " + connectorId, () -> {
+            final WebTarget target = connectorTarget
+                .path("/controller-services/{controllerServiceId}/state")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("controllerServiceId", controllerServiceId);
+
+            return getRequestBuilder(target).get(ComponentStateEntity.class);
+        });
+    }
+
+    @Override
+    public ComponentStateEntity clearControllerServiceState(final String 
connectorId, final String controllerServiceId) throws NiFiClientException, 
IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(controllerServiceId)) {
+            throw new IllegalArgumentException("Controller service id cannot 
be null or blank");
+        }
+
+        return executeAction("Error clearing controller service state for 
Connector " + connectorId, () -> {
+            final WebTarget target = connectorTarget
+                
.path("/controller-services/{controllerServiceId}/state/clear-requests")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("controllerServiceId", controllerServiceId);
+
+            return getRequestBuilder(target).post(null, 
ComponentStateEntity.class);
+        });
+    }
 }

Reply via email to