This is an automated email from the ASF dual-hosted git repository.
rfellows pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 3f981da5b3 NIFI-15549: Adding endpoints for viewing and clearing state
for compo… (#10855)
3f981da5b3 is described below
commit 3f981da5b378527c520a141440509501ed04cc31
Author: Matt Gilman <[email protected]>
AuthorDate: Thu Feb 5 12:05:53 2026 -0500
NIFI-15549: Adding endpoints for viewing and clearing state for compo…
(#10855)
* NIFI-15549: Adding endpoints for viewing and clearing state for
components that are managed by a Connector.
* NIFI-15549: Addressing review feedback.
---
.../endpoints/ComponentStateEndpointMerger.java | 6 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 53 +++++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 69 ++++++
.../org/apache/nifi/web/api/ConnectorResource.java | 240 ++++++++++++++++++++
.../nifi/web/StandardNiFiServiceFacadeTest.java | 250 +++++++++++++++++++++
.../apache/nifi/web/api/TestConnectorResource.java | 116 ++++++++++
.../nifi/toolkit/client/ConnectorClient.java | 45 ++++
.../toolkit/client/impl/JerseyConnectorClient.java | 77 +++++++
8 files changed, 855 insertions(+), 1 deletion(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
index 60686c5369..02e06b93d8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
@@ -38,6 +38,8 @@ public class ComponentStateEndpointMerger extends
AbstractSingleDTOEndpoint<Comp
public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN =
Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state");
public static final Pattern REPORTING_TASK_STATE_URI_PATTERN =
Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state");
public static final Pattern FLOW_ANALYSIS_RULE_STATE_URI_PATTERN =
Pattern.compile("/nifi-api/controller/flow-analysis-rules/[a-f0-9\\-]{36}/state");
+ public static final Pattern CONNECTOR_PROCESSOR_STATE_URI_PATTERN =
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/processors/[a-f0-9\\-]{36}/state");
+ public static final Pattern CONNECTOR_CONTROLLER_SERVICE_STATE_URI_PATTERN
=
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/controller-services/[a-f0-9\\-]{36}/state");
@Override
public boolean canHandle(URI uri, String method) {
@@ -48,7 +50,9 @@ public class ComponentStateEndpointMerger extends
AbstractSingleDTOEndpoint<Comp
return PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
||
CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
||
REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
- ||
FLOW_ANALYSIS_RULE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+ ||
FLOW_ANALYSIS_RULE_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+ ||
CONNECTOR_PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+ ||
CONNECTOR_CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index c350b2b983..29788ca773 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -2101,6 +2101,59 @@ public interface NiFiServiceFacade {
*/
ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
+ /**
+ * Gets the state for a processor within a connector's managed process
group.
+ *
+ * @param connectorId the connector id
+ * @param processorId the processor id
+ * @return the component state
+ */
+ ComponentStateDTO getConnectorProcessorState(String connectorId, String
processorId);
+
+ /**
+ * Verifies the processor state within a connector could be cleared.
+ *
+ * @param connectorId the connector id
+ * @param processorId the processor id
+ */
+ void verifyCanClearConnectorProcessorState(String connectorId, String
processorId);
+
+ /**
+ * Clears the state for a processor within a connector's managed process
group.
+ *
+ * @param connectorId the connector id
+ * @param processorId the processor id
+ * @param componentStateDTO the state of the processor
+ * @return the cleared component state
+ */
+ ComponentStateDTO clearConnectorProcessorState(String connectorId, String
processorId, ComponentStateDTO componentStateDTO);
+
+ /**
+ * Gets the state for a controller service within a connector's managed
process group.
+ *
+ * @param connectorId the connector id
+ * @param controllerServiceId the controller service id
+ * @return the component state
+ */
+ ComponentStateDTO getConnectorControllerServiceState(String connectorId,
String controllerServiceId);
+
+ /**
+ * Verifies the controller service state within a connector could be
cleared.
+ *
+ * @param connectorId the connector id
+ * @param controllerServiceId the controller service id
+ */
+ void verifyCanClearConnectorControllerServiceState(String connectorId,
String controllerServiceId);
+
+ /**
+ * Clears the state for a controller service within a connector's managed
process group.
+ *
+ * @param connectorId the connector id
+ * @param controllerServiceId the controller service id
+ * @param componentStateDTO the state of the controller service
+ * @return the cleared component state
+ */
+ ComponentStateDTO clearConnectorControllerServiceState(String connectorId,
String controllerServiceId, ComponentStateDTO componentStateDTO);
// ----------------------------------------
// Label methods
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 654644d552..1bfe0de934 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -415,6 +415,7 @@ import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
import org.apache.nifi.web.dao.ConnectorDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
@@ -497,6 +498,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
private BulletinRepository bulletinRepository;
// data access objects
+ private ComponentStateDAO componentStateDAO;
private ProcessorDAO processorDAO;
private ProcessGroupDAO processGroupDAO;
private RemoteProcessGroupDAO remoteProcessGroupDAO;
@@ -2099,6 +2101,68 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return dtoFactory.createComponentStateDTO(remoteProcessGroupId,
remoteProcessGroup.getClass(), localState, clusterState);
}
+ @Override
+ public ComponentStateDTO getConnectorProcessorState(final String
connectorId, final String processorId) {
+ final ProcessorNode processor = locateConnectorProcessor(connectorId,
processorId);
+ final StateMap clusterState = isClustered() ?
componentStateDAO.getState(processor, Scope.CLUSTER) : null;
+ final StateMap localState = componentStateDAO.getState(processor,
Scope.LOCAL);
+ return dtoFactory.createComponentStateDTO(processorId,
processor.getProcessor().getClass(), localState, clusterState);
+ }
+
+ @Override
+ public void verifyCanClearConnectorProcessorState(final String
connectorId, final String processorId) {
+ final ProcessorNode processor = locateConnectorProcessor(connectorId,
processorId);
+ processor.verifyCanClearState();
+ }
+
+ @Override
+ public ComponentStateDTO clearConnectorProcessorState(final String
connectorId, final String processorId, final ComponentStateDTO
componentStateDTO) {
+ final ProcessorNode processor = locateConnectorProcessor(connectorId,
processorId);
+ componentStateDAO.clearState(processor, componentStateDTO);
+ return getConnectorProcessorState(connectorId, processorId);
+ }
+
+ @Override
+ public ComponentStateDTO getConnectorControllerServiceState(final String
connectorId, final String controllerServiceId) {
+ final ControllerServiceNode controllerService =
locateConnectorControllerService(connectorId, controllerServiceId);
+ final StateMap clusterState = isClustered() ?
componentStateDAO.getState(controllerService, Scope.CLUSTER) : null;
+ final StateMap localState =
componentStateDAO.getState(controllerService, Scope.LOCAL);
+ return dtoFactory.createComponentStateDTO(controllerServiceId,
controllerService.getControllerServiceImplementation().getClass(), localState,
clusterState);
+ }
+
+ @Override
+ public void verifyCanClearConnectorControllerServiceState(final String
connectorId, final String controllerServiceId) {
+ final ControllerServiceNode controllerService =
locateConnectorControllerService(connectorId, controllerServiceId);
+ controllerService.verifyCanClearState();
+ }
+
+ @Override
+ public ComponentStateDTO clearConnectorControllerServiceState(final String
connectorId, final String controllerServiceId, final ComponentStateDTO
componentStateDTO) {
+ final ControllerServiceNode controllerService =
locateConnectorControllerService(connectorId, controllerServiceId);
+ componentStateDAO.clearState(controllerService, componentStateDTO);
+ return getConnectorControllerServiceState(connectorId,
controllerServiceId);
+ }
+
+ private ProcessorNode locateConnectorProcessor(final String connectorId,
final String processorId) {
+ final ConnectorNode connectorNode =
connectorDAO.getConnector(connectorId);
+ final ProcessGroup managedGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+ final ProcessorNode processor =
managedGroup.findProcessor(processorId);
+ if (processor == null) {
+ throw new ResourceNotFoundException("Unable to find processor with
id '%s' within connector '%s'.".formatted(processorId, connectorId));
+ }
+ return processor;
+ }
+
+ private ControllerServiceNode locateConnectorControllerService(final
String connectorId, final String controllerServiceId) {
+ final ConnectorNode connectorNode =
connectorDAO.getConnector(connectorId);
+ final ProcessGroup managedGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+ final ControllerServiceNode controllerService =
managedGroup.findControllerService(controllerServiceId, false, true);
+ if (controllerService == null) {
+ throw new ResourceNotFoundException("Unable to find controller
service with id '%s' within connector '%s'.".formatted(controllerServiceId,
connectorId));
+ }
+ return controllerService;
+ }
+
@Override
public ConnectionEntity deleteConnection(final Revision revision, final
String connectionId) {
final Connection connection =
connectionDAO.getConnection(connectionId);
@@ -7800,6 +7864,11 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
this.parameterProviderDAO = parameterProviderDAO;
}
+ @Autowired
+ public void setComponentStateDAO(final ComponentStateDAO
componentStateDAO) {
+ this.componentStateDAO = componentStateDAO;
+ }
+
@Autowired
public void setParameterContextDAO(final ParameterContextDAO
parameterContextDAO) {
this.parameterContextDAO = parameterContextDAO;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 00a44a0575..1f4b0a9932 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -74,6 +74,7 @@ import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
import org.apache.nifi.web.api.concurrent.UpdateStep;
import org.apache.nifi.web.api.dto.AssetDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
@@ -82,6 +83,7 @@ import
org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -2146,6 +2148,244 @@ public class ConnectorResource extends
ApplicationResource {
.build();
}
+ // -----------------
+ // Processor State
+ // -----------------
+
+ /**
+ * Gets the state for a processor within a connector.
+ *
+ * @param connectorId the connector id
+ * @param processorId the processor id
+ * @return a ComponentStateEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/processors/{processorId}/state")
+ @Operation(
+ summary = "Gets the state for a processor within a connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response getConnectorProcessorState(
+ @Parameter(description = "The connector id.", required = true)
+ @PathParam("id") final String connectorId,
+ @Parameter(description = "The processor id.", required = true)
+ @PathParam("processorId") final String processorId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable connector = lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final ComponentStateDTO state =
serviceFacade.getConnectorProcessorState(connectorId, processorId);
+
+ final ComponentStateEntity entity = new ComponentStateEntity();
+ entity.setComponentState(state);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Clears the state for a processor within a connector.
+ *
+ * @param connectorId the connector id
+ * @param processorId the processor id
+ * @return a ComponentStateEntity
+ */
+ @POST
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/processors/{processorId}/state/clear-requests")
+ @Operation(
+ summary = "Clears the state for a processor within a connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response clearConnectorProcessorState(
+ @Parameter(description = "The connector id.", required = true)
+ @PathParam("id") final String connectorId,
+ @Parameter(description = "The processor id.", required = true)
+ @PathParam("processorId") final String processorId,
+ @Parameter(description = "Optional component state to perform a
selective key removal. If omitted, clears all state.", required = false)
+ final ComponentStateEntity componentStateEntity) {
+
+ if (isReplicateRequest()) {
+ if (componentStateEntity == null) {
+ return replicate(HttpMethod.POST);
+ } else {
+ return replicate(HttpMethod.POST, componentStateEntity);
+ }
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(connectorId);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector =
lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () ->
serviceFacade.verifyCanClearConnectorProcessorState(connectorId, processorId),
+ (connectorEntity) -> {
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearConnectorProcessorState(connectorEntity.getId(),
processorId, expectedState);
+
+ // generate the response entity
+ final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
+
+ // generate the response
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
+ // -----------------
+ // Controller Service State
+ // -----------------
+
+ /**
+ * Gets the state for a controller service within a connector.
+ *
+ * @param connectorId the connector id
+ * @param controllerServiceId the controller service id
+ * @return a ComponentStateEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/controller-services/{controllerServiceId}/state")
+ @Operation(
+ summary = "Gets the state for a controller service within a
connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response getConnectorControllerServiceState(
+ @Parameter(description = "The connector id.", required = true)
+ @PathParam("id") final String connectorId,
+ @Parameter(description = "The controller service id.", required =
true)
+ @PathParam("controllerServiceId") final String
controllerServiceId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable connector = lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final ComponentStateDTO state =
serviceFacade.getConnectorControllerServiceState(connectorId,
controllerServiceId);
+
+ final ComponentStateEntity entity = new ComponentStateEntity();
+ entity.setComponentState(state);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Clears the state for a controller service within a connector.
+ *
+ * @param connectorId the connector id
+ * @param controllerServiceId the controller service id
+ * @return a ComponentStateEntity
+ */
+ @POST
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
+ @Produces(MediaType.APPLICATION_JSON)
+
@Path("{id}/controller-services/{controllerServiceId}/state/clear-requests")
+ @Operation(
+ summary = "Clears the state for a controller service within a
connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ComponentStateEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response clearConnectorControllerServiceState(
+ @Parameter(description = "The connector id.", required = true)
+ @PathParam("id") final String connectorId,
+ @Parameter(description = "The controller service id.", required =
true)
+ @PathParam("controllerServiceId") final String controllerServiceId,
+ @Parameter(description = "Optional component state to perform a
selective key removal. If omitted, clears all state.", required = false)
+ final ComponentStateEntity componentStateEntity) {
+
+ if (isReplicateRequest()) {
+ if (componentStateEntity == null) {
+ return replicate(HttpMethod.POST);
+ } else {
+ return replicate(HttpMethod.POST, componentStateEntity);
+ }
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(connectorId);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector =
lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () ->
serviceFacade.verifyCanClearConnectorControllerServiceState(connectorId,
controllerServiceId),
+ (connectorEntity) -> {
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearConnectorControllerServiceState(connectorEntity.getId(),
controllerServiceId, expectedState);
+
+ // generate the response entity
+ final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
+
+ // generate the response
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
// -----------------
// setters
// -----------------
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 1779e797f2..a754520732 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -40,6 +40,9 @@ import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.components.connector.Secret;
import org.apache.nifi.components.connector.secrets.AuthorizableSecret;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
@@ -67,6 +70,7 @@ import org.apache.nifi.groups.VersionedComponentAdditions;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -89,6 +93,7 @@ import org.apache.nifi.util.MockBulletinRepository;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.validation.RuleViolation;
import org.apache.nifi.validation.RuleViolationsManager;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
@@ -112,6 +117,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.TenantsEntity;
import org.apache.nifi.web.controller.ControllerFacade;
+import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ConnectorDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@@ -1779,4 +1785,248 @@ public class StandardNiFiServiceFacadeTest {
assertNotNull(result.getSecrets());
assertTrue(result.getSecrets().isEmpty());
}
+
+ // -----------------
+ // Connector State Tests
+ // -----------------
+
+ @Test
+ public void testGetConnectorProcessorState() {
+ final String connectorId = "connector-id";
+ final String processorId = "processor-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ final ComponentStateDAO componentStateDAO =
mock(ComponentStateDAO.class);
+ final DtoFactory dtoFactory = mock(DtoFactory.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+ serviceFacade.setComponentStateDAO(componentStateDAO);
+ serviceFacade.setDtoFactory(dtoFactory);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ProcessorNode processorNode = mock(ProcessorNode.class);
+ final Processor processor = mock(Processor.class);
+ final StateMap localStateMap = mock(StateMap.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+ when(processorNode.getProcessor()).thenReturn(processor);
+ when(componentStateDAO.getState(processorNode,
Scope.LOCAL)).thenReturn(localStateMap);
+
+ final ComponentStateDTO expectedDto = new ComponentStateDTO();
+ expectedDto.setComponentId(processorId);
+ when(dtoFactory.createComponentStateDTO(eq(processorId),
eq(processor.getClass()), eq(localStateMap), any())).thenReturn(expectedDto);
+
+ final ComponentStateDTO result =
serviceFacade.getConnectorProcessorState(connectorId, processorId);
+
+ assertNotNull(result);
+ assertEquals(processorId, result.getComponentId());
+ verify(connectorDAO).getConnector(connectorId);
+ verify(managedProcessGroup).findProcessor(processorId);
+ verify(componentStateDAO).getState(processorNode, Scope.LOCAL);
+ }
+
+ @Test
+ public void testGetConnectorProcessorStateNotFound() {
+ final String connectorId = "connector-id";
+ final String processorId = "non-existent-processor-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+ when(managedProcessGroup.findProcessor(processorId)).thenReturn(null);
+
+ assertThrows(ResourceNotFoundException.class, () ->
serviceFacade.getConnectorProcessorState(connectorId, processorId));
+ }
+
+ @Test
+ public void testVerifyCanClearConnectorProcessorState() {
+ final String connectorId = "connector-id";
+ final String processorId = "processor-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ProcessorNode processorNode = mock(ProcessorNode.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+
+ // Should not throw
+ serviceFacade.verifyCanClearConnectorProcessorState(connectorId,
processorId);
+
+ verify(processorNode).verifyCanClearState();
+ }
+
+ @Test
+ public void testClearConnectorProcessorState() {
+ final String connectorId = "connector-id";
+ final String processorId = "processor-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ final ComponentStateDAO componentStateDAO =
mock(ComponentStateDAO.class);
+ final DtoFactory dtoFactory = mock(DtoFactory.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+ serviceFacade.setComponentStateDAO(componentStateDAO);
+ serviceFacade.setDtoFactory(dtoFactory);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ProcessorNode processorNode = mock(ProcessorNode.class);
+ final Processor processor = mock(Processor.class);
+ final StateMap localStateMap = mock(StateMap.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+
when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode);
+ when(processorNode.getProcessor()).thenReturn(processor);
+ when(componentStateDAO.getState(processorNode,
Scope.LOCAL)).thenReturn(localStateMap);
+
+ final ComponentStateDTO expectedDto = new ComponentStateDTO();
+ expectedDto.setComponentId(processorId);
+ when(dtoFactory.createComponentStateDTO(eq(processorId),
eq(processor.getClass()), eq(localStateMap), any())).thenReturn(expectedDto);
+
+ final ComponentStateDTO result =
serviceFacade.clearConnectorProcessorState(connectorId, processorId, null);
+
+ assertNotNull(result);
+ assertEquals(processorId, result.getComponentId());
+ verify(componentStateDAO).clearState(processorNode, null);
+ }
+
+ @Test
+ public void testGetConnectorControllerServiceState() {
+ final String connectorId = "connector-id";
+ final String controllerServiceId = "controller-service-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ final ComponentStateDAO componentStateDAO =
mock(ComponentStateDAO.class);
+ final DtoFactory dtoFactory = mock(DtoFactory.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+ serviceFacade.setComponentStateDAO(componentStateDAO);
+ serviceFacade.setDtoFactory(dtoFactory);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ControllerServiceNode controllerServiceNode =
mock(ControllerServiceNode.class);
+ final ControllerService controllerService =
mock(ControllerService.class);
+ final StateMap localStateMap = mock(StateMap.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+ when(managedProcessGroup.findControllerService(controllerServiceId,
false, true)).thenReturn(controllerServiceNode);
+
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(controllerService);
+ when(componentStateDAO.getState(controllerServiceNode,
Scope.LOCAL)).thenReturn(localStateMap);
+
+ final ComponentStateDTO expectedDto = new ComponentStateDTO();
+ expectedDto.setComponentId(controllerServiceId);
+ when(dtoFactory.createComponentStateDTO(eq(controllerServiceId),
eq(controllerService.getClass()), eq(localStateMap),
any())).thenReturn(expectedDto);
+
+ final ComponentStateDTO result =
serviceFacade.getConnectorControllerServiceState(connectorId,
controllerServiceId);
+
+ assertNotNull(result);
+ assertEquals(controllerServiceId, result.getComponentId());
+ verify(connectorDAO).getConnector(connectorId);
+ verify(managedProcessGroup).findControllerService(controllerServiceId,
false, true);
+ verify(componentStateDAO).getState(controllerServiceNode, Scope.LOCAL);
+ }
+
+ @Test
+ public void testGetConnectorControllerServiceStateNotFound() {
+ final String connectorId = "connector-id";
+ final String controllerServiceId =
"non-existent-controller-service-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+ when(managedProcessGroup.findControllerService(controllerServiceId,
false, true)).thenReturn(null);
+
+ assertThrows(ResourceNotFoundException.class, () ->
serviceFacade.getConnectorControllerServiceState(connectorId,
controllerServiceId));
+ }
+
+ @Test
+ public void testVerifyCanClearConnectorControllerServiceState() {
+ final String connectorId = "connector-id";
+ final String controllerServiceId = "controller-service-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ControllerServiceNode controllerServiceNode =
mock(ControllerServiceNode.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+ when(managedProcessGroup.findControllerService(controllerServiceId,
false, true)).thenReturn(controllerServiceNode);
+
+ // Should not throw
+
serviceFacade.verifyCanClearConnectorControllerServiceState(connectorId,
controllerServiceId);
+
+ verify(controllerServiceNode).verifyCanClearState();
+ }
+
+ @Test
+ public void testClearConnectorControllerServiceState() {
+ final String connectorId = "connector-id";
+ final String controllerServiceId = "controller-service-id";
+
+ final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
+ final ComponentStateDAO componentStateDAO =
mock(ComponentStateDAO.class);
+ final DtoFactory dtoFactory = mock(DtoFactory.class);
+ serviceFacade.setConnectorDAO(connectorDAO);
+ serviceFacade.setComponentStateDAO(componentStateDAO);
+ serviceFacade.setDtoFactory(dtoFactory);
+
+ final ConnectorNode connectorNode = mock(ConnectorNode.class);
+ final FrameworkFlowContext flowContext =
mock(FrameworkFlowContext.class);
+ final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
+ final ControllerServiceNode controllerServiceNode =
mock(ControllerServiceNode.class);
+ final ControllerService controllerService =
mock(ControllerService.class);
+ final StateMap localStateMap = mock(StateMap.class);
+
+ when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode);
+ when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
+
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
+ when(managedProcessGroup.findControllerService(controllerServiceId,
false, true)).thenReturn(controllerServiceNode);
+
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(controllerService);
+ when(componentStateDAO.getState(controllerServiceNode,
Scope.LOCAL)).thenReturn(localStateMap);
+
+ final ComponentStateDTO expectedDto = new ComponentStateDTO();
+ expectedDto.setComponentId(controllerServiceId);
+ when(dtoFactory.createComponentStateDTO(eq(controllerServiceId),
eq(controllerService.getClass()), eq(localStateMap),
any())).thenReturn(expectedDto);
+
+ final ComponentStateDTO result =
serviceFacade.clearConnectorControllerServiceState(connectorId,
controllerServiceId, null);
+
+ assertNotNull(result);
+ assertEquals(controllerServiceId, result.getComponentId());
+ verify(componentStateDAO).clearState(controllerServiceNode, null);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
index 9ec1afb42c..5d50471cf9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
@@ -29,10 +29,12 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AllowableValueDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
@@ -104,6 +106,8 @@ public class TestConnectorResource {
private static final String PROPERTY_GROUP_NAME = "test-group";
private static final String PROPERTY_NAME = "test-property";
private static final String PROCESS_GROUP_ID = "test-process-group-id";
+ private static final String PROCESSOR_ID = "test-processor-id";
+ private static final String CONTROLLER_SERVICE_ID =
"test-controller-service-id";
@BeforeEach
public void setUp() throws Exception {
@@ -629,4 +633,116 @@ public class TestConnectorResource {
verify(serviceFacade, never()).createConnector(any(Revision.class),
any(ConnectorDTO.class));
}
+
+ @Test
+ public void testGetConnectorProcessorState() {
+ final ComponentStateDTO stateDTO = new ComponentStateDTO();
+ stateDTO.setComponentId(PROCESSOR_ID);
+ when(serviceFacade.getConnectorProcessorState(CONNECTOR_ID,
PROCESSOR_ID)).thenReturn(stateDTO);
+
+ try (Response response =
connectorResource.getConnectorProcessorState(CONNECTOR_ID, PROCESSOR_ID)) {
+ assertEquals(200, response.getStatus());
+ final ComponentStateEntity entity = (ComponentStateEntity)
response.getEntity();
+ assertEquals(PROCESSOR_ID,
entity.getComponentState().getComponentId());
+ }
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade).getConnectorProcessorState(CONNECTOR_ID,
PROCESSOR_ID);
+ }
+
+ @Test
+ public void testGetConnectorProcessorStateNotAuthorized() {
+
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+ assertThrows(AccessDeniedException.class, () ->
+ connectorResource.getConnectorProcessorState(CONNECTOR_ID,
PROCESSOR_ID));
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade, never()).getConnectorProcessorState(anyString(),
anyString());
+ }
+
+ @Test
+ public void testClearConnectorProcessorState() {
+ final ComponentStateDTO stateDTO = new ComponentStateDTO();
+ stateDTO.setComponentId(PROCESSOR_ID);
+ when(serviceFacade.clearConnectorProcessorState(eq(CONNECTOR_ID),
eq(PROCESSOR_ID), any())).thenReturn(stateDTO);
+
+ try (Response response =
connectorResource.clearConnectorProcessorState(CONNECTOR_ID, PROCESSOR_ID,
null)) {
+ assertEquals(200, response.getStatus());
+ final ComponentStateEntity entity = (ComponentStateEntity)
response.getEntity();
+ assertEquals(PROCESSOR_ID,
entity.getComponentState().getComponentId());
+ }
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
verify(serviceFacade).verifyCanClearConnectorProcessorState(CONNECTOR_ID,
PROCESSOR_ID);
+ verify(serviceFacade).clearConnectorProcessorState(eq(CONNECTOR_ID),
eq(PROCESSOR_ID), any());
+ }
+
+ @Test
+ public void testClearConnectorProcessorStateNotAuthorized() {
+
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+ assertThrows(AccessDeniedException.class, () ->
+ connectorResource.clearConnectorProcessorState(CONNECTOR_ID,
PROCESSOR_ID, null));
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade,
never()).verifyCanClearConnectorProcessorState(anyString(), anyString());
+ verify(serviceFacade,
never()).clearConnectorProcessorState(anyString(), anyString(), any());
+ }
+
+ @Test
+ public void testGetConnectorControllerServiceState() {
+ final ComponentStateDTO stateDTO = new ComponentStateDTO();
+ stateDTO.setComponentId(CONTROLLER_SERVICE_ID);
+ when(serviceFacade.getConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID)).thenReturn(stateDTO);
+
+ try (Response response =
connectorResource.getConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID)) {
+ assertEquals(200, response.getStatus());
+ final ComponentStateEntity entity = (ComponentStateEntity)
response.getEntity();
+ assertEquals(CONTROLLER_SERVICE_ID,
entity.getComponentState().getComponentId());
+ }
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade).getConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID);
+ }
+
+ @Test
+ public void testGetConnectorControllerServiceStateNotAuthorized() {
+
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+ assertThrows(AccessDeniedException.class, () ->
+ connectorResource.getConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID));
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade,
never()).getConnectorControllerServiceState(anyString(), anyString());
+ }
+
+ @Test
+ public void testClearConnectorControllerServiceState() {
+ final ComponentStateDTO stateDTO = new ComponentStateDTO();
+ stateDTO.setComponentId(CONTROLLER_SERVICE_ID);
+
when(serviceFacade.clearConnectorControllerServiceState(eq(CONNECTOR_ID),
eq(CONTROLLER_SERVICE_ID), any())).thenReturn(stateDTO);
+
+ try (Response response =
connectorResource.clearConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID, null)) {
+ assertEquals(200, response.getStatus());
+ final ComponentStateEntity entity = (ComponentStateEntity)
response.getEntity();
+ assertEquals(CONTROLLER_SERVICE_ID,
entity.getComponentState().getComponentId());
+ }
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
verify(serviceFacade).verifyCanClearConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID);
+
verify(serviceFacade).clearConnectorControllerServiceState(eq(CONNECTOR_ID),
eq(CONTROLLER_SERVICE_ID), any());
+ }
+
+ @Test
+ public void testClearConnectorControllerServiceStateNotAuthorized() {
+
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+ assertThrows(AccessDeniedException.class, () ->
+
connectorResource.clearConnectorControllerServiceState(CONNECTOR_ID,
CONTROLLER_SERVICE_ID, null));
+
+ verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+ verify(serviceFacade,
never()).verifyCanClearConnectorControllerServiceState(anyString(),
anyString());
+ verify(serviceFacade,
never()).clearConnectorControllerServiceState(anyString(), anyString(), any());
+ }
}
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index d411ece527..05f9ed173c 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.client;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -366,6 +367,50 @@ public interface ConnectorClient {
*/
DropRequestEntity deletePurgeRequest(String connectorId, String
purgeRequestId) throws NiFiClientException, IOException;
+ /**
+ * Gets the state for a processor within a connector.
+ *
+ * @param connectorId the connector ID
+ * @param processorId the processor ID
+ * @return the component state entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ComponentStateEntity getProcessorState(String connectorId, String
processorId) throws NiFiClientException, IOException;
+
+ /**
+ * Clears the state for a processor within a connector.
+ *
+ * @param connectorId the connector ID
+ * @param processorId the processor ID
+ * @return the component state entity after clearing
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ComponentStateEntity clearProcessorState(String connectorId, String
processorId) throws NiFiClientException, IOException;
+
+ /**
+ * Gets the state for a controller service within a connector.
+ *
+ * @param connectorId the connector ID
+ * @param controllerServiceId the controller service ID
+ * @return the component state entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ComponentStateEntity getControllerServiceState(String connectorId, String
controllerServiceId) throws NiFiClientException, IOException;
+
+ /**
+ * Clears the state for a controller service within a connector.
+ *
+ * @param connectorId the connector ID
+ * @param controllerServiceId the controller service ID
+ * @return the component state entity after clearing
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ComponentStateEntity clearControllerServiceState(String connectorId,
String controllerServiceId) throws NiFiClientException, IOException;
+
/**
* Indicates that mutable requests should indicate that the client has
* acknowledged that the node is disconnected.
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index 06476292da..7b648e7183 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -27,6 +27,7 @@ import org.apache.nifi.toolkit.client.RequestConfig;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -593,4 +594,80 @@ public class JerseyConnectorClient extends
AbstractJerseyClient implements Conne
return getRequestBuilder(target).delete(DropRequestEntity.class);
});
}
+
+ @Override
+ public ComponentStateEntity getProcessorState(final String connectorId,
final String processorId) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(processorId)) {
+ throw new IllegalArgumentException("Processor id cannot be null or
blank");
+ }
+
+ return executeAction("Error retrieving processor state for Connector "
+ connectorId, () -> {
+ final WebTarget target = connectorTarget
+ .path("/processors/{processorId}/state")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("processorId", processorId);
+
+ return getRequestBuilder(target).get(ComponentStateEntity.class);
+ });
+ }
+
+ @Override
+ public ComponentStateEntity clearProcessorState(final String connectorId,
final String processorId) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(processorId)) {
+ throw new IllegalArgumentException("Processor id cannot be null or
blank");
+ }
+
+ return executeAction("Error clearing processor state for Connector " +
connectorId, () -> {
+ final WebTarget target = connectorTarget
+ .path("/processors/{processorId}/state/clear-requests")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("processorId", processorId);
+
+ return getRequestBuilder(target).post(null,
ComponentStateEntity.class);
+ });
+ }
+
+ @Override
+ public ComponentStateEntity getControllerServiceState(final String
connectorId, final String controllerServiceId) throws NiFiClientException,
IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(controllerServiceId)) {
+ throw new IllegalArgumentException("Controller service id cannot
be null or blank");
+ }
+
+ return executeAction("Error retrieving controller service state for
Connector " + connectorId, () -> {
+ final WebTarget target = connectorTarget
+ .path("/controller-services/{controllerServiceId}/state")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("controllerServiceId", controllerServiceId);
+
+ return getRequestBuilder(target).get(ComponentStateEntity.class);
+ });
+ }
+
+ @Override
+ public ComponentStateEntity clearControllerServiceState(final String
connectorId, final String controllerServiceId) throws NiFiClientException,
IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(controllerServiceId)) {
+ throw new IllegalArgumentException("Controller service id cannot
be null or blank");
+ }
+
+ return executeAction("Error clearing controller service state for
Connector " + connectorId, () -> {
+ final WebTarget target = connectorTarget
+
.path("/controller-services/{controllerServiceId}/state/clear-requests")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("controllerServiceId", controllerServiceId);
+
+ return getRequestBuilder(target).post(null,
ComponentStateEntity.class);
+ });
+ }
}