This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 135d3f0d70a7b49c6189db084f400cb0ef748d27 Author: Mark Payne <[email protected]> AuthorDate: Wed Jan 21 14:20:07 2026 -0500 NIFI-15461: Added ability to initiate drainage of Connector's FlowFiles and calce; added tests to verify; some bug fixes (#10767) --- .../http/StandardHttpResponseMapper.java | 2 + .../endpoints/ConnectorStatusEndpointMerger.java | 86 ++++++++ .../ClusteredConnectorRequestReplicator.java | 22 +- .../cluster/manager/ConnectorEntityMerger.java | 29 ++- .../apache/nifi/cluster/manager/StatusMerger.java | 15 ++ .../cluster/manager/ConnectorEntityMergerTest.java | 30 +++ .../nifi/components/connector/ConnectorNode.java | 12 ++ .../connector/StandardConnectorNode.java | 83 +++++++- .../connector/StandardConnectorRepository.java | 22 +- .../components/connector/StandardFlowContext.java | 5 + .../connector/TestStandardConnectorNode.java | 118 +++++++++++ .../org/apache/nifi/web/NiFiServiceFacade.java | 8 + .../apache/nifi/web/StandardNiFiServiceFacade.java | 57 ++++++ .../org/apache/nifi/web/api/ConnectorResource.java | 149 ++++++++++++++ .../java/org/apache/nifi/web/dao/ConnectorDAO.java | 6 + .../nifi/web/dao/impl/StandardConnectorDAO.java | 18 ++ .../apache/nifi/web/api/TestConnectorResource.java | 62 +++++- .../tests/system/DataQueuingConnector.java | 3 +- ...nnector.java => GatedDataQueuingConnector.java} | 73 ++++++- .../processors/tests/system/TerminateFlowFile.java | 28 +++ .../org.apache.nifi.components.connector.Connector | 1 + .../apache/nifi/tests/system/NiFiClientUtil.java | 24 +++ .../org/apache/nifi/tests/system/NiFiSystemIT.java | 23 +++ .../connectors/ClusteredConnectorDrainIT.java | 228 +++++++++++++++++++++ .../tests/system/connectors/ConnectorDrainIT.java | 178 ++++++++++++++++ .../nifi/toolkit/client/ConnectorClient.java | 44 ++++ .../toolkit/client/impl/JerseyConnectorClient.java | 69 +++++++ 27 files changed, 1357 insertions(+), 38 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 0eefb8b763..efa017ae5f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -30,6 +30,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorEndpointMerg import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorFlowEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupNamesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ConnectorsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger; @@ -146,6 +147,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { endpointMergers.add(new ProcessorRunStatusDetailsEndpointMerger()); endpointMergers.add(new ConnectorEndpointMerger()); endpointMergers.add(new ConnectorsEndpointMerger()); + endpointMergers.add(new ConnectorStatusEndpointMerger()); endpointMergers.add(new ConnectorFlowEndpointMerger()); endpointMergers.add(new ConnectorPropertyGroupEndpointMerger()); endpointMergers.add(new ConnectorPropertyGroupNamesEndpointMerger()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java new file mode 100644 index 0000000000..bff04b3fe7 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.ComponentEntityStatusMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ConnectorStatusEndpointMerger extends AbstractSingleEntityEndpoint<ProcessGroupStatusEntity> implements ComponentEntityStatusMerger<ProcessGroupStatusDTO> { + public static final Pattern CONNECTOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/status"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONNECTOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ProcessGroupStatusEntity> getEntityClass() { + return ProcessGroupStatusEntity.class; + } + + @Override + protected void mergeResponses(final ProcessGroupStatusEntity clientEntity, final Map<NodeIdentifier, ProcessGroupStatusEntity> entityMap, + final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) { + final ProcessGroupStatusDTO mergedProcessGroupStatus = clientEntity.getProcessGroupStatus(); + mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<>()); + + final NodeIdentifier selectedNodeId = entityMap.entrySet().stream() + .filter(e -> e.getValue() == clientEntity) + .map(Map.Entry::getKey) + .findFirst() + .orElse(null); + + final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(mergedProcessGroupStatus.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, ProcessGroupStatusEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessGroupStatusEntity nodeProcessGroupStatusEntity = entry.getValue(); + final ProcessGroupStatusDTO nodeProcessGroupStatus = nodeProcessGroupStatusEntity.getProcessGroupStatus(); + if (nodeProcessGroupStatus == mergedProcessGroupStatus) { + continue; + } + + mergeStatus(mergedProcessGroupStatus, clientEntity.getCanRead(), nodeProcessGroupStatus, nodeProcessGroupStatusEntity.getCanRead(), nodeId); + } + } + + @Override + public void mergeStatus(final ProcessGroupStatusDTO clientStatus, final boolean clientStatusReadablePermission, + final ProcessGroupStatusDTO status, final boolean statusReadablePermission, + final NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, + statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), statusNodeIdentifier.getApiPort()); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java index 8fdc58dd35..a3db59cf4f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java @@ -24,11 +24,13 @@ import jakarta.ws.rs.core.Response.StatusType; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.components.connector.ConnectorRequestReplicator; import org.apache.nifi.components.connector.ConnectorState; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.apache.nifi.web.api.entity.Entity; import java.io.IOException; import java.net.URI; @@ -55,14 +57,26 @@ public class ClusteredConnectorRequestReplicator implements ConnectorRequestRepl public ConnectorState getState(final String connectorId) throws IOException { final RequestReplicator requestReplicator = getRequestReplicator(); final NiFiUser nodeUser = getNodeUser(); - final AsyncClusterResponse asyncResponse = requestReplicator.replicate(nodeUser, GET, URI.create(replicationScheme + "://localhost/nifi-api/connectors/" + connectorId), Map.of(), Map.of()); + final URI uri = URI.create(replicationScheme + "://localhost/nifi-api/connectors/" + connectorId); + final AsyncClusterResponse asyncResponse = requestReplicator.replicate(nodeUser, GET, uri, Map.of(), Map.of()); + try { - final Response response = asyncResponse.awaitMergedResponse().getClientResponse(); + final NodeResponse mergedNodeResponse = asyncResponse.awaitMergedResponse(); + final Response response = mergedNodeResponse.getClientResponse(); verifyResponse(response.getStatusInfo(), connectorId); - final ConnectorEntity connectorEntity = response.readEntity(ConnectorEntity.class); - final String stateName = connectorEntity.getComponent().getState(); + // Use the merged/updated entity if available, otherwise fall back to reading from the raw response. + // The updatedEntity contains the properly merged state from all nodes, while readEntity() would + // only return the state from whichever single node was selected as the "client response". + final ConnectorEntity connectorEntity; + final Entity updatedEntity = mergedNodeResponse.getUpdatedEntity(); + if (updatedEntity instanceof ConnectorEntity mergedConnectorEntity) { + connectorEntity = mergedConnectorEntity; + } else { + connectorEntity = response.readEntity(ConnectorEntity.class); + } + final String stateName = connectorEntity.getComponent().getState(); try { return ConnectorState.valueOf(stateName); } catch (final IllegalArgumentException e) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java index 5f880dfc3f..7ef7fdf76b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.components.connector.ConnectorState; import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorDTO; @@ -26,7 +27,6 @@ import java.util.HashMap; import java.util.Map; public class ConnectorEntityMerger { - /** * Merges the ConnectorEntity responses. * @@ -43,6 +43,16 @@ public class ConnectorEntityMerger { } mergeDtos(clientDto, dtoMap); + + mergeStatus(clientEntity, entityMap); + } + + private static void mergeStatus(final ConnectorEntity clientEntity, final Map<NodeIdentifier, ConnectorEntity> entityMap) { + for (final ConnectorEntity nodeEntity : entityMap.values()) { + if (nodeEntity != clientEntity && nodeEntity != null) { + StatusMerger.merge(clientEntity.getStatus(), nodeEntity.getStatus()); + } + } } private static void mergeDtos(final ConnectorDTO clientDto, final Map<NodeIdentifier, ConnectorDTO> dtoMap) { @@ -89,13 +99,16 @@ public class ConnectorEntityMerger { if (state == null) { return 0; } - return switch (state) { - case "UPDATE_FAILED" -> 6; - case "PREPARING_FOR_UPDATE" -> 5; - case "UPDATING" -> 4; - case "UPDATED" -> 3; - case "STARTING", "STOPPING" -> 2; - default -> 1; // RUNNING, STOPPED, DISABLED + + final ConnectorState connectorState = ConnectorState.valueOf(state); + return switch (connectorState) { + case UPDATE_FAILED -> 7; + case PREPARING_FOR_UPDATE -> 6; + case UPDATING -> 5; + case DRAINING, PURGING -> 4; + case UPDATED -> 3; + case STARTING, STOPPING -> 2; + default -> 1; // RUNNING, STOPPED }; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index f238dc1049..5b403e95f8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusPredictionsSnapshotDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.FlowAnalysisRuleStatusDTO; @@ -1084,4 +1085,18 @@ public class StatusMerger { target.setValidationStatus(ValidationStatus.INVALID.name()); } } + + public static void merge(final ConnectorStatusDTO target, final ConnectorStatusDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + + if (ValidationStatus.VALIDATING.name().equalsIgnoreCase(toMerge.getValidationStatus())) { + target.setValidationStatus(ValidationStatus.VALIDATING.name()); + } else if (ValidationStatus.INVALID.name().equalsIgnoreCase(toMerge.getValidationStatus())) { + target.setValidationStatus(ValidationStatus.INVALID.name()); + } + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java index da0ed151f2..21f351b7a9 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java @@ -25,6 +25,7 @@ import org.apache.nifi.web.api.dto.ConnectorPropertyDescriptorDTO; import org.apache.nifi.web.api.dto.PermissionsDTO; import org.apache.nifi.web.api.dto.PropertyGroupConfigurationDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO; import org.apache.nifi.web.api.entity.AllowableValueEntity; import org.apache.nifi.web.api.entity.ConnectorEntity; import org.junit.jupiter.api.Test; @@ -130,6 +131,22 @@ class ConnectorEntityMergerTest { assertEquals(2, config.getConfigurationStepConfigurations().size()); } + @Test + void testMergeConnectorStatusValidationStatusPriority() { + final ConnectorEntity clientEntity = createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID"); + final ConnectorEntity node1Entity = createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALIDATING"); + final ConnectorEntity node2Entity = createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID"); + + final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>(); + entityMap.put(getNodeIdentifier("client", 8000), clientEntity); + entityMap.put(getNodeIdentifier("node1", 8001), node1Entity); + entityMap.put(getNodeIdentifier("node2", 8002), node2Entity); + + ConnectorEntityMerger.merge(clientEntity, entityMap); + + assertEquals("VALIDATING", clientEntity.getStatus().getValidationStatus()); + } + private NodeIdentifier getNodeIdentifier(final String id, final int port) { return new NodeIdentifier(id, "localhost", port, "localhost", port + 1, "localhost", port + 2, port + 3, true); } @@ -153,6 +170,19 @@ class ConnectorEntityMergerTest { return entity; } + private ConnectorEntity createConnectorEntityWithStatus(final String id, final String state, final int activeThreadCount, final String validationStatus) { + final ConnectorEntity entity = createConnectorEntity(id, state); + + final ConnectorStatusDTO status = new ConnectorStatusDTO(); + status.setId(id); + status.setRunStatus(state); + status.setActiveThreadCount(activeThreadCount); + status.setValidationStatus(validationStatus); + entity.setStatus(status); + + return entity; + } + private ConnectorEntity createConnectorEntityWithConfig(final String id, final String state, final Map<String, List<String>> propertyAllowableValues) { final ConnectorEntity entity = createConnectorEntity(id, state); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java index bb3bdf6e2d..8a39f4b66a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java @@ -199,6 +199,18 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent */ Future<Void> drainFlowFiles(); + /** + * Cancels the draining of FlowFiles that is currently in progress. + * @throws IllegalStateException if the Connector is not currently draining FlowFiles + */ + void cancelDrainFlowFiles(); + + /** + * Verifies that the Connector can cancel draining FlowFiles. + * @throws IllegalStateException if not in a state where draining FlowFiles can be cancelled + */ + void verifyCancelDrainFlowFiles() throws IllegalStateException; + /** * Purges all FlowFiles from the Connector, immediately dropping the data. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index afb6e37bd9..471fbe10ae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -41,9 +41,13 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.flow.Bundle; +import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConfigurationStep; import org.apache.nifi.flow.VersionedConnectorValueReference; +import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; @@ -95,6 +99,7 @@ public class StandardConnectorNode implements ConnectorNode { private final ConnectorValidationTrigger validationTrigger; private final boolean extensionMissing; private volatile boolean triggerValidation = true; + private final AtomicReference<CompletableFuture<Void>> drainFutureRef = new AtomicReference<>(); private volatile FrameworkFlowContext workingFlowContext; @@ -398,6 +403,7 @@ public class StandardConnectorNode implements ConnectorNode { @Override public Future<Void> stop(final FlowEngine scheduler) { + logger.info("Stopping {}", this); final CompletableFuture<Void> stopCompleteFuture = new CompletableFuture<>(); stateTransition.setDesiredState(ConnectorState.STOPPED); @@ -406,7 +412,7 @@ public class StandardConnectorNode implements ConnectorNode { while (!stateUpdated) { final ConnectorState currentState = getCurrentState(); if (currentState == ConnectorState.STOPPED) { - logger.debug("{} is already {}; will not attempt to stop", this, currentState); + logger.info("{} is already stopped.", this); stopCompleteFuture.complete(null); return stopCompleteFuture; } @@ -430,8 +436,24 @@ public class StandardConnectorNode implements ConnectorNode { requireStopped("drain FlowFiles", ConnectorState.DRAINING); try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connectorDetails.getConnector().getClass(), getIdentifier())) { - final CompletableFuture<Void> future = connectorDetails.getConnector().drainFlowFiles(activeFlowContext); - final CompletableFuture<Void> stateUpdateFuture = future.whenComplete((result, failureCause) -> stateTransition.setCurrentState(ConnectorState.STOPPED)); + getComponentLog().info("Draining FlowFiles from {}", this); + final CompletableFuture<Void> drainFuture = connectorDetails.getConnector().drainFlowFiles(activeFlowContext); + drainFutureRef.set(drainFuture); + + final CompletableFuture<Void> stateUpdateFuture = drainFuture.whenComplete((result, failureCause) -> { + drainFutureRef.set(null); + logger.info("Successfully drained FlowFiles from {}; ensuring all components are stopped.", this); + + try { + connectorDetails.getConnector().stop(activeFlowContext); + } catch (final Exception e) { + logger.warn("Failed to stop {} after draining FlowFiles", this, e); + } + + stateTransition.setCurrentState(ConnectorState.STOPPED); + logger.info("All components of {} are now stopped after draining FlowFiles.", this); + }); + return stateUpdateFuture; } catch (final Throwable t) { stateTransition.setCurrentState(ConnectorState.STOPPED); @@ -439,6 +461,30 @@ public class StandardConnectorNode implements ConnectorNode { } } + @Override + public void cancelDrainFlowFiles() { + final Future<Void> future = this.drainFutureRef.getAndSet(null); + if (future == null) { + logger.debug("No active drain to cancel for {}; drain may have already completed", this); + return; + } + + future.cancel(true); + logger.info("Cancelled draining of FlowFiles for {}", this); + } + + @Override + public void verifyCancelDrainFlowFiles() throws IllegalStateException { + final ConnectorState state = getCurrentState(); + + // Allow if we're currently draining or if we're stopped; if stopped the cancel drain action will be a no-op + // but we don't want to throw an IllegalStateException in that case because doing so would mean that if one + // node in the cluster is stopped while another is draining we cannot cancel the drain. + if (state != ConnectorState.DRAINING && state != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot cancel draining of FlowFiles for " + this + " because its current state is " + state + "; it must be DRAINING."); + } + } + @Override public Future<Void> purgeFlowFiles(final String requestor) { requireStopped("purge FlowFiles", ConnectorState.PURGING); @@ -610,6 +656,9 @@ public class StandardConnectorNode implements ConnectorNode { logger.info("{} has no initial flow to load", this); } else { logger.info("Loading initial flow for {}", this); + // Update all RUNNING components to ENABLED before applying the initial flow so that components + // are not started before being configured. + stopComponents(initialFlow.getFlowContents()); initializationContext.updateFlow(activeFlowContext, initialFlow); } @@ -617,6 +666,24 @@ public class StandardConnectorNode implements ConnectorNode { recreateWorkingFlowContext(); } + private void stopComponents(final VersionedProcessGroup group) { + for (final VersionedProcessor processor : group.getProcessors()) { + if (processor.getScheduledState() == ScheduledState.RUNNING) { + processor.setScheduledState(ScheduledState.ENABLED); + } + } + + for (final VersionedControllerService service : group.getControllerServices()) { + if (service.getScheduledState() == ScheduledState.RUNNING) { + service.setScheduledState(ScheduledState.ENABLED); + } + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + stopComponents(childGroup); + } + } + private void recreateWorkingFlowContext() { destroyWorkingContext(); workingFlowContext = flowContextFactory.createWorkingFlowContext(identifier, @@ -879,6 +946,7 @@ public class StandardConnectorNode implements ConnectorNode { actions.add(createDiscardWorkingConfigAction()); actions.add(createPurgeFlowFilesAction(stopped, dataQueued)); actions.add(createDrainFlowFilesAction(stopped, dataQueued)); + actions.add(createCancelDrainFlowFilesAction(currentState == ConnectorState.DRAINING)); actions.add(createApplyUpdatesAction(currentState)); actions.add(createDeleteAction(stopped, dataQueued)); @@ -987,6 +1055,15 @@ public class StandardConnectorNode implements ConnectorNode { return new StandardConnectorAction(actionName, description, allowed, reason); } + private ConnectorAction createCancelDrainFlowFilesAction(final boolean draining) { + if (draining) { + return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES", "Cancel the ongoing drain of FlowFiles", true, null); + } + + return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES", "Cancel the ongoing drain of FlowFiles", false, + "Connector is not currently draining FlowFiles"); + } + private ConnectorAction createApplyUpdatesAction(final ConnectorState currentState) { final boolean allowed; final String reason; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index d47cb4ed2e..40b39694fd 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -107,11 +106,14 @@ public class StandardConnectorRepository implements ConnectorRepository { @Override public void applyUpdate(final ConnectorNode connector, final ConnectorUpdateContext context) throws FlowUpdateException { final ConnectorState initialDesiredState = connector.getDesiredState(); + logger.info("Applying update to Connector {}", connector); // Transition the connector's state to PREPARING_FOR_UPDATE before starting the background process. // This allows us to ensure that if we poll and see the state in the same state it was in before that // we know the update has already completed (successfully or otherwise). + logger.debug("Transitioning {} to PREPARING_FOR_UPDATE state before applying update", connector); connector.transitionStateForUpdating(); + logger.debug("{} is now in PREPARING_FOR_UPDATE state", connector); // Update connector in a background thread. This will handle transitioning the Connector state appropriately // so that it's clear when the update has completed. @@ -121,28 +123,24 @@ public class StandardConnectorRepository implements ConnectorRepository { private void updateConnector(final ConnectorNode connector, final ConnectorState initialDesiredState, final ConnectorUpdateContext context) { try { // Perform whatever preparation is necessary for the update. Default implementation is to stop the connector. + logger.debug("Preparing {} for update", connector); connector.prepareForUpdate(); // Wait for Connector State to become UPDATING + logger.debug("Waiting for {} to transition to UPDATING state", connector); waitForState(connector, Set.of(ConnectorState.UPDATING), Set.of(ConnectorState.PREPARING_FOR_UPDATE)); // Apply the update to the connector. + logger.info("{} has now completed preparations for update; applying update now", connector); connector.applyUpdate(); + logger.info("{} has successfully applied update", connector); // Now that the update has been applied, save the flow so that the updated configuration is persisted. context.saveFlow(); - // Wait for Connector State to become UPDATED, or to revert to the initial desired state because, depending upon timing, - // other nodes may have already seen the transition to UPDATED and moved the connector back to the initial desired state. - final Set<ConnectorState> desirableStates = new HashSet<>(); - desirableStates.add(initialDesiredState); - desirableStates.add(ConnectorState.UPDATED); - if (initialDesiredState == ConnectorState.RUNNING) { - desirableStates.add(ConnectorState.STARTING); - } else if (initialDesiredState == ConnectorState.STOPPED) { - desirableStates.add(ConnectorState.STOPPING); - } - waitForState(connector, desirableStates, Set.of(ConnectorState.UPDATING)); + // Wait for all nodes to complete the update. + waitForState(connector, Set.of(ConnectorState.UPDATED), Set.of(ConnectorState.UPDATING)); + logger.info("{} has successfully completed update on all nodes", connector); // If the initial desired state was RUNNING, start the connector again. Otherwise, stop it. // We don't simply leave it be as the prepareForUpdate / update may have changed the state of some components. diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java index 16ebbc3e39..70ff4e079d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java @@ -28,6 +28,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import java.util.Collections; +import java.util.HashMap; public class StandardFlowContext implements FrameworkFlowContext { private final ProcessGroup managedProcessGroup; @@ -76,6 +77,10 @@ public class StandardFlowContext implements FrameworkFlowContext { @Override public void updateFlow(final VersionedExternalFlow versionedExternalFlow, final AssetManager assetManager) throws FlowUpdateException { + if (versionedExternalFlow.getParameterContexts() == null) { + versionedExternalFlow.setParameterContexts(new HashMap<>()); + } + final String parameterContextName = managedProcessGroup.getParameterContext().getName(); updateParameterContext(versionedExternalFlow.getFlowContents(), parameterContextName); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java index 4eba662d26..d306ac45aa 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java @@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -509,6 +510,33 @@ public class TestStandardConnectorNode { assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState()); } + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + public void testCancelDrainFlowFilesInterruptsConnector() throws Exception { + final BusyLoopDrainConnector busyLoopConnector = new BusyLoopDrainConnector(); + final StandardConnectorNode connectorNode = createConnectorNode(busyLoopConnector); + + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + + connectorNode.drainFlowFiles(); + + assertTrue(busyLoopConnector.awaitDrainStarted(2, TimeUnit.SECONDS)); + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + + Thread.sleep(1000); + + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + assertFalse(busyLoopConnector.wasInterrupted()); + assertFalse(busyLoopConnector.wasStopCalled()); + + connectorNode.cancelDrainFlowFiles(); + + assertTrue(busyLoopConnector.awaitDrainCompleted(2, TimeUnit.SECONDS)); + assertTrue(busyLoopConnector.wasInterrupted()); + assertTrue(busyLoopConnector.wasStopCalled()); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + } + private StandardConnectorNode createConnectorNode() throws FlowUpdateException { final SleepingConnector sleepingConnector = new SleepingConnector(Duration.ofMillis(1)); return createConnectorNode(sleepingConnector); @@ -727,4 +755,94 @@ public class TestStandardConnectorNode { } } + /** + * Test connector that uses a busy loop for draining that never completes naturally, + * but can be interrupted via cancellation. + */ + private static class BusyLoopDrainConnector extends AbstractConnector { + private volatile boolean interrupted = false; + private volatile boolean stopCalled = false; + private final AtomicReference<Thread> drainThreadRef = new AtomicReference<>(); + private final CountDownLatch drainStartedLatch = new CountDownLatch(1); + private final CountDownLatch drainCompletedLatch = new CountDownLatch(1); + + @Override + public VersionedExternalFlow getInitialFlow() { + return null; + } + + @Override + public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) { + } + + @Override + public List<ConfigurationStep> getConfigurationSteps() { + return List.of(); + } + + @Override + public void applyUpdate(final FlowContext workingContext, final FlowContext activeContext) { + } + + @Override + protected void onStepConfigured(final String stepName, final FlowContext workingContext) { + } + + @Override + public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final Map<String, String> overrides, final FlowContext flowContext) { + return List.of(); + } + + @Override + public void stop(final FlowContext activeContext) { + stopCalled = true; + } + + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + + final Thread drainThread = new Thread(() -> { + drainStartedLatch.countDown(); + try { + while (!Thread.currentThread().isInterrupted()) { + // Busy loop that never completes naturally + } + } finally { + interrupted = Thread.currentThread().isInterrupted(); + drainCompletedLatch.countDown(); + } + }); + + drainThreadRef.set(drainThread); + + future.whenComplete((result, throwable) -> { + final Thread thread = drainThreadRef.get(); + if (thread != null) { + thread.interrupt(); + } + }); + + drainThread.start(); + + return future; + } + + public boolean awaitDrainStarted(final long timeout, final TimeUnit unit) throws InterruptedException { + return drainStartedLatch.await(timeout, unit); + } + + public boolean awaitDrainCompleted(final long timeout, final TimeUnit unit) throws InterruptedException { + return drainCompletedLatch.await(timeout, unit); + } + + public boolean wasInterrupted() { + return interrupted; + } + + public boolean wasStopCalled() { + return stopCalled; + } + } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 967c0e1f29..d270c0344f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -210,6 +210,14 @@ public interface NiFiServiceFacade { ConnectorEntity scheduleConnector(Revision revision, String id, ScheduledState state); + void verifyDrainConnector(String id); + + ConnectorEntity drainConnector(Revision revision, String id); + + void verifyCancelConnectorDrain(String id); + + ConnectorEntity cancelConnectorDrain(Revision revision, String id); + ConfigurationStepNamesEntity getConnectorConfigurationSteps(String id); ConfigurationStepEntity getConnectorConfigurationStep(String id, String configurationStepName); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 1826a1efd4..bee52d0446 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -81,6 +81,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.connector.Connector; import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorState; import org.apache.nifi.components.connector.ConnectorUpdateContext; import org.apache.nifi.components.connector.Secret; import org.apache.nifi.components.connector.secrets.AuthorizableSecret; @@ -3609,6 +3610,62 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createConnectorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions); } + @Override + public void verifyDrainConnector(final String id) { + final ConnectorNode connector = connectorDAO.getConnector(id); + final ConnectorState currentState = connector.getCurrentState(); + if (currentState != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot drain FlowFiles for Connector " + id + " because it is not currently stopped. Current state: " + currentState); + } + } + + @Override + public ConnectorEntity drainConnector(final Revision revision, final String id) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionClaim claim = new StandardRevisionClaim(revision); + + final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> { + connectorDAO.drainFlowFiles(id); + controllerFacade.save(); + + final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorDTO dto = dtoFactory.createConnectorDto(node); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); + return entityFactory.createConnectorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions); + } + + @Override + public void verifyCancelConnectorDrain(final String id) { + connectorDAO.verifyCancelDrainFlowFile(id); + } + + @Override + public ConnectorEntity cancelConnectorDrain(final Revision revision, final String id) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionClaim claim = new StandardRevisionClaim(revision); + + final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> { + connectorDAO.cancelDrainFlowFiles(id); + controllerFacade.save(); + + final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorDTO dto = dtoFactory.createConnectorDto(node); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); + return entityFactory.createConnectorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions); + } + @Override public ConfigurationStepNamesEntity getConnectorConfigurationSteps(final String id) { final ConnectorNode node = connectorDAO.getConnector(id); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java index cb5146afdb..f7cd6c50f0 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java @@ -622,6 +622,155 @@ public class ConnectorResource extends ApplicationResource { ); } + /** + * Initiates the draining of FlowFiles for the specified connector. + * + * @param id The id of the connector to drain. + * @param requestConnectorEntity A connectorEntity containing the revision. + * @return A connectorEntity. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{id}/drain") + @Operation( + summary = "Initiates draining of FlowFiles for a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ConnectorEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "This will initiate draining of FlowFiles for a stopped connector. Draining allows the connector to process " + + "data that is currently in the flow but does not ingest any additional data. The connector must be in a STOPPED state " + + "before draining can begin. Once initiated, the connector will transition to a DRAINING state. Use the DELETE method " + + "on this endpoint to cancel an ongoing drain operation.", + security = { + @SecurityRequirement(name = "Write - /connectors/{uuid} or /operation/connectors/{uuid}") + } + ) + public Response initiateDrain( + @Parameter( + description = "The connector id.", + required = true + ) + @PathParam("id") final String id, + @Parameter( + description = "The connector entity with revision.", + required = true + ) final ConnectorEntity requestConnectorEntity) { + + if (requestConnectorEntity == null || requestConnectorEntity.getRevision() == null) { + throw new IllegalArgumentException("Connector entity with revision must be specified."); + } + + if (requestConnectorEntity.getId() != null && !id.equals(requestConnectorEntity.getId())) { + throw new IllegalArgumentException(String.format("The connector id (%s) in the request body does not equal the " + + "connector id of the requested resource (%s).", requestConnectorEntity.getId(), id)); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST, requestConnectorEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(requestConnectorEntity.isDisconnectedNodeAcknowledged()); + } + + final Revision requestRevision = getRevision(requestConnectorEntity, id); + return withWriteLock( + serviceFacade, + requestConnectorEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final Authorizable connector = lookup.getConnector(id); + OperationAuthorizable.authorizeOperation(connector, authorizer, user); + }, + () -> serviceFacade.verifyDrainConnector(id), + (revision, connectorEntity) -> { + final ConnectorEntity entity = serviceFacade.drainConnector(revision, id); + populateRemainingConnectorEntityContent(entity); + + return generateOkResponse(entity).build(); + } + ); + } + + /** + * Cancels the draining of FlowFiles for the specified connector. + * + * @param version The revision is used to verify the client is working with the latest version of the flow. + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connector to cancel draining for. + * @return A connectorEntity. + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{id}/drain") + @Operation( + summary = "Cancels the draining of FlowFiles for a connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ConnectorEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Write - /connectors/{uuid} or /operation/connectors/{uuid}") + } + ) + public Response cancelDrain( + @Parameter( + description = "The revision is used to verify the client is working with the latest version of the flow." + ) + @QueryParam(VERSION) final LongParameter version, + @Parameter( + description = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response." + ) + @QueryParam(CLIENT_ID) final ClientIdParameter clientId, + @Parameter( + description = "Acknowledges that this node is disconnected to allow for mutable requests to proceed." + ) + @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, + @Parameter( + description = "The connector id.", + required = true + ) + @PathParam("id") final String id) { + + if (isReplicateRequest()) { + return replicate(HttpMethod.DELETE); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); + } + + final ConnectorEntity requestConnectorEntity = new ConnectorEntity(); + requestConnectorEntity.setId(id); + + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + return withWriteLock( + serviceFacade, + requestConnectorEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final Authorizable connector = lookup.getConnector(id); + OperationAuthorizable.authorizeOperation(connector, authorizer, user); + }, + () -> serviceFacade.verifyCancelConnectorDrain(id), + (revision, connectorEntity) -> { + final ConnectorEntity entity = serviceFacade.cancelConnectorDrain(revision, id); + populateRemainingConnectorEntityContent(entity); + + return generateOkResponse(entity).build(); + } + ); + } + /** * Gets the configuration step names for the specified connector. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java index a987dde126..d094f18ca9 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java @@ -45,6 +45,12 @@ public interface ConnectorDAO { void stopConnector(String id); + void drainFlowFiles(String id); + + void cancelDrainFlowFiles(String id); + + void verifyCancelDrainFlowFile(String id); + void updateConnectorConfigurationStep(String id, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration); void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java index e55e6ceee0..19b58dd207 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java @@ -127,6 +127,24 @@ public class StandardConnectorDAO implements ConnectorDAO { getConnectorRepository().stopConnector(connector); } + @Override + public void drainFlowFiles(final String id) { + final ConnectorNode connector = getConnector(id); + connector.drainFlowFiles(); + } + + @Override + public void cancelDrainFlowFiles(final String id) { + final ConnectorNode connector = getConnector(id); + connector.cancelDrainFlowFiles(); + } + + @Override + public void verifyCancelDrainFlowFile(final String id) { + final ConnectorNode connector = getConnector(id); + connector.verifyCancelDrainFlowFiles(); + } + @Override public void updateConnectorConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) { final ConnectorNode connector = getConnector(id); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java index 17678723c7..60702b8b23 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java @@ -412,12 +412,72 @@ public class TestConnectorResource { doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class)); assertThrows(AccessDeniedException.class, () -> - connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID, PROCESS_GROUP_ID, true, false, true)); + connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID, PROCESS_GROUP_ID, true, false, true)); verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class)); verify(serviceFacade, never()).getConnectorControllerServices(anyString(), anyString(), eq(true), eq(false), eq(true)); } + @Test + public void testInitiateDrain() { + final ConnectorEntity requestEntity = createConnectorEntity(); + final ConnectorEntity responseEntity = createConnectorEntity(); + responseEntity.getComponent().setState("DRAINING"); + + when(serviceFacade.drainConnector(any(Revision.class), eq(CONNECTOR_ID))).thenReturn(responseEntity); + + try (Response response = connectorResource.initiateDrain(CONNECTOR_ID, requestEntity)) { + assertEquals(200, response.getStatus()); + assertEquals(responseEntity, response.getEntity()); + } + + verify(serviceFacade).verifyDrainConnector(CONNECTOR_ID); + verify(serviceFacade).drainConnector(any(Revision.class), eq(CONNECTOR_ID)); + } + + @Test + public void testInitiateDrainNotAuthorized() { + final ConnectorEntity requestEntity = createConnectorEntity(); + + doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class)); + + assertThrows(AccessDeniedException.class, () -> + connectorResource.initiateDrain(CONNECTOR_ID, requestEntity)); + + verify(serviceFacade, never()).verifyDrainConnector(anyString()); + verify(serviceFacade, never()).drainConnector(any(Revision.class), anyString()); + } + + @Test + public void testInitiateDrainWithNullEntity() { + assertThrows(IllegalArgumentException.class, () -> + connectorResource.initiateDrain(CONNECTOR_ID, null)); + + verify(serviceFacade, never()).drainConnector(any(Revision.class), anyString()); + } + + @Test + public void testInitiateDrainWithNullRevision() { + final ConnectorEntity requestEntity = createConnectorEntity(); + requestEntity.setRevision(null); + + assertThrows(IllegalArgumentException.class, () -> + connectorResource.initiateDrain(CONNECTOR_ID, requestEntity)); + + verify(serviceFacade, never()).drainConnector(any(Revision.class), anyString()); + } + + @Test + public void testInitiateDrainWithMismatchedId() { + final ConnectorEntity requestEntity = createConnectorEntity(); + requestEntity.setId("different-id"); + + assertThrows(IllegalArgumentException.class, () -> + connectorResource.initiateDrain(CONNECTOR_ID, requestEntity)); + + verify(serviceFacade, never()).drainConnector(any(Revision.class), anyString()); + } + private ConnectorEntity createConnectorEntity() { final ConnectorEntity entity = new ConnectorEntity(); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java index 920c61480a..987b7e812d 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java @@ -50,7 +50,8 @@ public class DataQueuingConnector extends AbstractConnector { bundle.setVersion("2.8.0-SNAPSHOT"); final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING); + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + generate.setSchedulingPeriod("100 millis"); final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java similarity index 64% copy from nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java copy to nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java index 920c61480a..857e77234f 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java @@ -20,6 +20,10 @@ package org.apache.nifi.connectors.tests.system; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.connector.AbstractConnector; import org.apache.nifi.components.connector.ConfigurationStep; +import org.apache.nifi.components.connector.ConnectorPropertyDescriptor; +import org.apache.nifi.components.connector.ConnectorPropertyGroup; +import org.apache.nifi.components.connector.FlowUpdateException; +import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ConnectableComponent; @@ -30,16 +34,47 @@ import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.processor.util.StandardValidators; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -public class DataQueuingConnector extends AbstractConnector { +/** + * A connector that generates FlowFiles and allows controlling when the TerminateFlowFile processor + * will process them via a gate file. If the gate file does not exist, FlowFiles will queue up. + * When the gate file exists, the TerminateFlowFile processor will process the queued FlowFiles. + * <p> + * The connector uses a configuration step to specify the gate file path. The test must configure + * the connector with a gate file path and apply the update before starting the connector. + */ +public class GatedDataQueuingConnector extends AbstractConnector { + + private static final String TERMINATE_PROCESSOR_ID = "term-1"; + + static final ConnectorPropertyDescriptor GATE_FILE_PATH = new ConnectorPropertyDescriptor.Builder() + .name("Gate File Path") + .description("The path to the gate file. When this file exists, the TerminateFlowFile processor " + + "will process FlowFiles. When it does not exist, FlowFiles will queue up.") + .required(true) + .type(PropertyType.STRING) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final ConnectorPropertyGroup PROPERTY_GROUP = new ConnectorPropertyGroup.Builder() + .name("Gate Configuration") + .addProperty(GATE_FILE_PATH) + .build(); + + private static final ConfigurationStep CONFIG_STEP = new ConfigurationStep.Builder() + .name("Gate Configuration") + .propertyGroups(List.of(PROPERTY_GROUP)) + .build(); + @Override protected void onStepConfigured(final String stepName, final FlowContext workingContext) { - } @Override @@ -47,13 +82,16 @@ public class DataQueuingConnector extends AbstractConnector { final Bundle bundle = new Bundle(); bundle.setGroup("org.apache.nifi"); bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); + bundle.setVersion("2.7.0-SNAPSHOT"); final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING); + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, + Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + generate.setSchedulingPeriod("100 millis"); - final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); + final VersionedProcessor terminate = createVersionedProcessor(TERMINATE_PROCESSOR_ID, "1234", "TerminateFlowFile", + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, + Collections.emptyMap(), ScheduledState.ENABLED); final ConnectableComponent source = new ConnectableComponent(); source.setId(generate.getIdentifier()); @@ -80,7 +118,7 @@ public class DataQueuingConnector extends AbstractConnector { connection.setzIndex(1L); final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Data Queuing Connector"); + rootGroup.setName("Gated Data Queuing Connector"); rootGroup.setIdentifier("1234"); rootGroup.setProcessors(Set.of(generate, terminate)); rootGroup.setConnections(Set.of(connection)); @@ -98,11 +136,28 @@ public class DataQueuingConnector extends AbstractConnector { @Override public List<ConfigurationStep> getConfigurationSteps() { - return List.of(); + return List.of(CONFIG_STEP); } @Override - public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { + public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) throws FlowUpdateException { + final String gateFilePath = workingFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, GATE_FILE_PATH).getValue(); + if (gateFilePath == null) { + return; + } + + final VersionedExternalFlow flow = getInitialFlow(); + final VersionedProcessGroup rootGroup = flow.getFlowContents(); + + for (final VersionedProcessor processor : rootGroup.getProcessors()) { + if (TERMINATE_PROCESSOR_ID.equals(processor.getIdentifier())) { + final Map<String, String> properties = new HashMap<>(processor.getProperties()); + properties.put("Gate File", gateFilePath); + processor.setProperties(properties); + } + } + + getInitializationContext().updateFlow(activeFlowContext, flow); } private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java index 734766fa37..9ab6fd71b1 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java @@ -16,15 +16,43 @@ */ package org.apache.nifi.processors.tests.system; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.File; +import java.util.List; public class TerminateFlowFile extends AbstractProcessor { + + public static final PropertyDescriptor GATE_FILE = new PropertyDescriptor.Builder() + .name("Gate File") + .description("An optional file path. If specified, the processor will only process FlowFiles when this file exists. " + + "If the file does not exist, the processor will yield and return without processing any data.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return List.of(GATE_FILE); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String gateFilePath = context.getProperty(GATE_FILE).getValue(); + if (gateFilePath != null) { + final File gateFile = new File(gateFilePath); + if (!gateFile.exists()) { + context.yield(); + return; + } + } + FlowFile flowFile = session.get(); if (flowFile == null) { return; diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector index 34fe7c90ea..3885e4c079 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector @@ -16,5 +16,6 @@ org.apache.nifi.connectors.tests.system.NopConnector org.apache.nifi.connectors.tests.system.AssetConnector org.apache.nifi.connectors.tests.system.DataQueuingConnector +org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector org.apache.nifi.connectors.tests.system.CalculateConnector diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 51272e8b8e..1a3b91c366 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -406,6 +406,16 @@ public class NiFiClientUtil { } } + public void startConnector(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException, InterruptedException { + startConnector(connectorEntity.getId()); + } + + public void startConnector(final String connectorId) throws NiFiClientException, IOException, InterruptedException { + final ConnectorEntity entity = getConnectorClient().getConnector(connectorId); + getConnectorClient().startConnector(entity); + waitForConnectorState(connectorId, ConnectorState.RUNNING); + } + public void stopConnector(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException, InterruptedException { stopConnector(connectorEntity.getId()); } @@ -437,6 +447,20 @@ public class NiFiClientUtil { } } + public ConnectorEntity drainConnector(final String connectorId) throws NiFiClientException, IOException { + final ConnectorEntity entity = getConnectorClient().getConnector(connectorId); + return getConnectorClient().drainConnector(entity); + } + + public ConnectorEntity cancelDrain(final String connectorId) throws NiFiClientException, IOException { + final ConnectorEntity entity = getConnectorClient().getConnector(connectorId); + return getConnectorClient().cancelDrain(entity); + } + + public void waitForConnectorDraining(final String connectorId) throws NiFiClientException, IOException, InterruptedException { + waitForConnectorState(connectorId, ConnectorState.DRAINING); + } + public ParameterProviderEntity createParameterProvider(final String simpleTypeName) throws NiFiClientException, IOException { return createParameterProvider(NiFiSystemIT.TEST_PARAM_PROVIDERS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index b73814250f..93614725e7 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -468,6 +468,29 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { logger.info("Queue Count for Connection {} is now {}", connectionId, queueSize); } + protected void waitForConnectorMinQueueCount(final String connectorId, final int minQueueSize) throws InterruptedException { + logger.info("Waiting for Queue Count of at least {} in Connector {}", minQueueSize, connectorId); + + waitFor(() -> { + try { + final ProcessGroupStatusEntity statusEntity = getNifiClient().getConnectorClient().getStatus(connectorId, true); + final int currentSize = statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued(); + logEverySecond("Current Queue Size for Connector {} = {}, Waiting for at least {}", connectorId, currentSize, minQueueSize); + return currentSize >= minQueueSize; + } catch (final Exception e) { + logger.error("Failed to get connector queue count", e); + return false; + } + }); + + logger.info("Queue Count for Connector {} is now at least {}", connectorId, minQueueSize); + } + + protected int getConnectorQueuedFlowFileCount(final String connectorId) throws NiFiClientException, IOException { + final ProcessGroupStatusEntity statusEntity = getNifiClient().getConnectorClient().getStatus(connectorId, true); + return statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued(); + } + private void waitForQueueCountToMatch(final String connectionId, final Predicate<Integer> test, final String queueSizeDescription) throws InterruptedException { waitFor(() -> { final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java new file mode 100644 index 0000000000..9ee3fbd09b --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.connectors; + +import org.apache.nifi.components.connector.ConnectorState; +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Cluster-specific tests for connector draining functionality. + * These tests verify that draining works correctly when nodes complete draining at different times. + */ +public class ClusteredConnectorDrainIT extends NiFiSystemIT { + + private static final Logger logger = LoggerFactory.getLogger(ClusteredConnectorDrainIT.class); + + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } + + /** + * Tests that when draining in a cluster: + * 1. Create gate file on Node 1 only - Node 1 finishes draining (STOPPED), Node 2 still DRAINING + * 2. Aggregate state should be DRAINING (since at least one node is still draining) + * 3. Create gate file on Node 2 - both nodes finish draining + * 4. Aggregate state should be STOPPED + */ + @Test + public void testDrainWithNodeCompletingAtDifferentTimes() throws NiFiClientException, IOException, InterruptedException { + final File node1InstanceDir = getNiFiInstance().getNodeInstance(1).getInstanceDirectory(); + final File node2InstanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory(); + final File node1GateFile = new File(node1InstanceDir, "gate-file.txt"); + final File node2GateFile = new File(node2InstanceDir, "gate-file.txt"); + node1GateFile.deleteOnExit(); + node2GateFile.deleteOnExit(); + deleteIfExists(node1GateFile); + deleteIfExists(node2GateFile); + + logger.info("Creating GatedDataQueuingConnector"); + final ConnectorEntity connector = getClientUtil().createConnector("GatedDataQueuingConnector"); + assertNotNull(connector); + final String connectorId = connector.getId(); + + final String gateFilePath = "./gate-file.txt"; + logger.info("Configuring connector {} with gate file path: {}", connectorId, gateFilePath); + getClientUtil().configureConnector(connector, "Gate Configuration", Map.of("Gate File Path", gateFilePath)); + getClientUtil().applyConnectorUpdate(connector); + + logger.info("Starting connector {}", connectorId); + getClientUtil().startConnector(connectorId); + + logger.info("Waiting for at least 5 FlowFiles to queue"); + waitForConnectorMinQueueCount(connectorId, 5); + + logger.info("Stopping connector {}", connectorId); + getClientUtil().stopConnector(connectorId); + getClientUtil().waitForConnectorStopped(connectorId); + + final int queuedCountBeforeDrain = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count before drain: {}", queuedCountBeforeDrain); + assertTrue(queuedCountBeforeDrain > 0); + + logger.info("Initiating drain for connector {}", connectorId); + getClientUtil().drainConnector(connectorId); + + logger.info("Waiting for aggregate connector state to be DRAINING"); + getClientUtil().waitForConnectorDraining(connectorId); + + logger.info("Creating gate file for Node 1 only: {}", node1GateFile.getAbsolutePath()); + assertTrue(node1GateFile.createNewFile()); + + logger.info("Waiting for Node 1 to finish draining and become STOPPED"); + waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED); + waitForNodeConnectorState(2, connectorId, ConnectorState.DRAINING); + + logger.info("Verifying aggregate state is still DRAINING (Node 2 still draining)"); + final ConnectorEntity aggregateConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + assertEquals(ConnectorState.DRAINING.name(), aggregateConnector.getComponent().getState()); + + logger.info("Creating gate file for Node 2: {}", node2GateFile.getAbsolutePath()); + assertTrue(node2GateFile.createNewFile()); + + logger.info("Waiting for aggregate state to become STOPPED"); + waitFor(() -> { + try { + final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId); + return ConnectorState.STOPPED.name().equals(entity.getComponent().getState()); + } catch (final Exception e) { + return false; + } + }); + + final ConnectorEntity finalConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Final aggregate connector state: {}", finalConnector.getComponent().getState()); + assertEquals(ConnectorState.STOPPED.name(), finalConnector.getComponent().getState()); + + final int finalQueuedCount = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Final queued FlowFile count: {}", finalQueuedCount); + assertEquals(0, finalQueuedCount); + + logger.info("testDrainWithNodeCompletingAtDifferentTimes completed successfully"); + } + + /** + * Tests that when canceling drain in a cluster where one node has already finished: + * 1. Create gate file on Node 1 only - Node 1 finishes draining (STOPPED), Node 2 still DRAINING + * 2. Aggregate state should be DRAINING + * 3. Cancel drain - Node 2 should stop draining + * 4. Aggregate state should be STOPPED + * 5. Data should still be queued (from Node 2) + */ + @Test + public void testCancelDrainWithOneNodeAlreadyComplete() throws NiFiClientException, IOException, InterruptedException { + final File node1InstanceDir = getNiFiInstance().getNodeInstance(1).getInstanceDirectory(); + final File node2InstanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory(); + final File node1GateFile = new File(node1InstanceDir, "gate-file-cancel.txt"); + final File node2GateFile = new File(node2InstanceDir, "gate-file-cancel.txt"); + node1GateFile.deleteOnExit(); + node2GateFile.deleteOnExit(); + deleteIfExists(node1GateFile); + deleteIfExists(node2GateFile); + + logger.info("Creating GatedDataQueuingConnector"); + final ConnectorEntity connector = getClientUtil().createConnector("GatedDataQueuingConnector"); + assertNotNull(connector); + final String connectorId = connector.getId(); + + final String gateFilePath = "./gate-file-cancel.txt"; + logger.info("Configuring connector {} with gate file path: {}", connectorId, gateFilePath); + getClientUtil().configureConnector(connector, "Gate Configuration", Map.of("Gate File Path", gateFilePath)); + getClientUtil().applyConnectorUpdate(connector); + + logger.info("Starting connector {}", connectorId); + getClientUtil().startConnector(connectorId); + + logger.info("Waiting for at least 5 FlowFiles to queue"); + waitForConnectorMinQueueCount(connectorId, 5); + + logger.info("Stopping connector {}", connectorId); + getClientUtil().stopConnector(connectorId); + getClientUtil().waitForConnectorStopped(connectorId); + + final int queuedCountBeforeDrain = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count before drain: {}", queuedCountBeforeDrain); + assertTrue(queuedCountBeforeDrain > 0); + + logger.info("Initiating drain for connector {}", connectorId); + getClientUtil().drainConnector(connectorId); + + logger.info("Waiting for aggregate connector state to be DRAINING"); + getClientUtil().waitForConnectorDraining(connectorId); + + logger.info("Creating gate file for Node 1 only: {}", node1GateFile.getAbsolutePath()); + assertTrue(node1GateFile.createNewFile()); + + logger.info("Waiting for Node 1 to finish draining and become STOPPED"); + waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED); + + logger.info("Verifying aggregate state is still DRAINING (Node 2 still draining)"); + final ConnectorEntity aggregateBeforeCancel = getNifiClient().getConnectorClient().getConnector(connectorId); + assertEquals(ConnectorState.DRAINING.name(), aggregateBeforeCancel.getComponent().getState()); + + logger.info("Canceling drain for connector {}", connectorId); + getClientUtil().cancelDrain(connectorId); + + logger.info("Waiting for aggregate state to become STOPPED after cancel"); + getClientUtil().waitForConnectorStopped(connectorId); + + final ConnectorEntity finalConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Final aggregate connector state: {}", finalConnector.getComponent().getState()); + assertEquals(ConnectorState.STOPPED.name(), finalConnector.getComponent().getState()); + + final int finalQueuedCount = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Final queued FlowFile count after cancel (should still have data from Node 2): {}", finalQueuedCount); + assertTrue(finalQueuedCount > 0); + + logger.info("testCancelDrainWithOneNodeAlreadyComplete completed successfully"); + } + + private void waitForNodeConnectorState(final int nodeIndex, final String connectorId, final ConnectorState expectedState) throws InterruptedException { + logger.info("Waiting for Node {} connector {} to reach state {}", nodeIndex, connectorId, expectedState); + waitFor(() -> { + try { + switchClientToNode(nodeIndex); + final ConnectorEntity entity = getNifiClient().getConnectorClient(DO_NOT_REPLICATE).getConnector(connectorId); + return expectedState.name().equals(entity.getComponent().getState()); + } catch (final Exception e) { + return false; + } + }); + logger.info("Node {} connector {} reached state {}", nodeIndex, connectorId, expectedState); + } + + private void deleteIfExists(final File file) { + if (file.exists()) { + assertTrue(file.delete()); + } + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java new file mode 100644 index 0000000000..eef6c4bf97 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.connectors; + +import org.apache.nifi.components.connector.ConnectorState; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConnectorDrainIT extends NiFiSystemIT { + + private static final Logger logger = LoggerFactory.getLogger(ConnectorDrainIT.class); + + @Test + public void testDrainFlowFiles() throws NiFiClientException, IOException, InterruptedException { + final File gateFile = new File(getNiFiInstance().getInstanceDirectory(), "gate-file.txt"); + gateFile.deleteOnExit(); + + if (gateFile.exists()) { + assertTrue(gateFile.delete()); + } + + logger.info("Creating GatedDataQueuingConnector"); + final ConnectorEntity connector = getClientUtil().createConnector("GatedDataQueuingConnector"); + assertNotNull(connector); + final String connectorId = connector.getId(); + + logger.info("Configuring connector {} with gate file path: {}", connectorId, gateFile.getAbsolutePath()); + getClientUtil().configureConnector(connector, "Gate Configuration", Map.of("Gate File Path", gateFile.getAbsolutePath())); + getClientUtil().applyConnectorUpdate(connector); + + logger.info("Starting connector {}", connectorId); + getClientUtil().startConnector(connectorId); + + logger.info("Waiting for at least 5 FlowFiles to queue"); + waitForConnectorMinQueueCount(connectorId, 5); + + logger.info("Stopping connector {}", connectorId); + getClientUtil().stopConnector(connectorId); + getClientUtil().waitForConnectorStopped(connectorId); + + final int queuedCountBeforeDrain = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count before drain: {}", queuedCountBeforeDrain); + assertTrue(queuedCountBeforeDrain > 0); + + logger.info("Initiating drain for connector {}", connectorId); + getClientUtil().drainConnector(connectorId); + + logger.info("Waiting for connector to enter DRAINING state"); + getClientUtil().waitForConnectorDraining(connectorId); + + logger.info("Sleeping for 2 seconds to verify connector remains in DRAINING state"); + Thread.sleep(2000L); + + ConnectorEntity drainingConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Connector state after 2 seconds: {}", drainingConnector.getComponent().getState()); + assertEquals(ConnectorState.DRAINING.name(), drainingConnector.getComponent().getState()); + + final int queuedWhileDraining = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count while draining (gate file absent): {}", queuedWhileDraining); + assertTrue(queuedWhileDraining > 0); + + logger.info("Creating gate file to allow draining to complete: {}", gateFile.getAbsolutePath()); + assertTrue(gateFile.createNewFile()); + + logger.info("Waiting for connector to complete draining and return to STOPPED state"); + waitFor(() -> { + try { + final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId); + return ConnectorState.STOPPED.name().equals(entity.getComponent().getState()); + } catch (final Exception e) { + return false; + } + }); + + final ConnectorEntity finalConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Final connector state: {}", finalConnector.getComponent().getState()); + assertEquals(ConnectorState.STOPPED.name(), finalConnector.getComponent().getState()); + + final int finalQueuedCount = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Final queued FlowFile count: {}", finalQueuedCount); + assertEquals(0, finalQueuedCount); + + logger.info("testDrainFlowFiles completed successfully"); + } + + @Test + public void testCancelDrainFlowFiles() throws NiFiClientException, IOException, InterruptedException { + final File gateFile = new File(getNiFiInstance().getInstanceDirectory(), "gate-file-cancel.txt"); + gateFile.deleteOnExit(); + + if (gateFile.exists()) { + assertTrue(gateFile.delete()); + } + + logger.info("Creating GatedDataQueuingConnector"); + final ConnectorEntity connector = getClientUtil().createConnector("GatedDataQueuingConnector"); + assertNotNull(connector); + final String connectorId = connector.getId(); + + logger.info("Configuring connector {} with gate file path: {}", connectorId, gateFile.getAbsolutePath()); + getClientUtil().configureConnector(connector, "Gate Configuration", Map.of("Gate File Path", gateFile.getAbsolutePath())); + getClientUtil().applyConnectorUpdate(connector); + + logger.info("Starting connector {}", connectorId); + getClientUtil().startConnector(connectorId); + + logger.info("Waiting for at least 5 FlowFiles to queue"); + waitForConnectorMinQueueCount(connectorId, 5); + + logger.info("Stopping connector {}", connectorId); + getClientUtil().stopConnector(connectorId); + getClientUtil().waitForConnectorStopped(connectorId); + + final int queuedCountBeforeDrain = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count before drain: {}", queuedCountBeforeDrain); + assertTrue(queuedCountBeforeDrain > 0); + + logger.info("Initiating drain for connector {}", connectorId); + getClientUtil().drainConnector(connectorId); + + logger.info("Waiting for connector to enter DRAINING state"); + getClientUtil().waitForConnectorDraining(connectorId); + + logger.info("Sleeping for 2 seconds to verify connector remains in DRAINING state"); + Thread.sleep(2000L); + + ConnectorEntity drainingConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Connector state after 2 seconds: {}", drainingConnector.getComponent().getState()); + assertEquals(ConnectorState.DRAINING.name(), drainingConnector.getComponent().getState()); + + final int queuedCountWhileDraining = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count while draining: {}", queuedCountWhileDraining); + assertTrue(queuedCountWhileDraining > 0); + + logger.info("Canceling drain for connector {}", connectorId); + getClientUtil().cancelDrain(connectorId); + + logger.info("Waiting for connector to return to STOPPED state after cancel"); + getClientUtil().waitForConnectorStopped(connectorId); + + final ConnectorEntity finalConnector = getNifiClient().getConnectorClient().getConnector(connectorId); + logger.info("Final connector state: {}", finalConnector.getComponent().getState()); + assertEquals(ConnectorState.STOPPED.name(), finalConnector.getComponent().getState()); + + final int queuedCountAfterCancel = getConnectorQueuedFlowFileCount(connectorId); + logger.info("Queued FlowFile count after cancel (should still have data): {}", queuedCountAfterCancel); + assertTrue(queuedCountAfterCancel > 0); + + logger.info("testCancelDrainFlowFiles completed successfully"); + } +} diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java index 1e95222225..76dfefaeaa 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java @@ -131,6 +131,50 @@ public interface ConnectorClient { */ ConnectorEntity stopConnector(ConnectorEntity connectorEntity) throws NiFiClientException, IOException; + /** + * Initiates draining of FlowFiles for a connector. + * + * @param connectorId the connector ID + * @param clientId the client ID + * @param version the revision version + * @return the updated connector entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + ConnectorEntity drainConnector(String connectorId, String clientId, long version) throws NiFiClientException, IOException; + + /** + * Initiates draining of FlowFiles for a connector using the information from the entity. + * + * @param connectorEntity the connector entity to drain + * @return the updated connector entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + ConnectorEntity drainConnector(ConnectorEntity connectorEntity) throws NiFiClientException, IOException; + + /** + * Cancels an ongoing drain operation for a connector. + * + * @param connectorId the connector ID + * @param clientId the client ID + * @param version the revision version + * @return the updated connector entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + ConnectorEntity cancelDrain(String connectorId, String clientId, long version) throws NiFiClientException, IOException; + + /** + * Cancels an ongoing drain operation for a connector using the information from the entity. + * + * @param connectorEntity the connector entity to cancel draining for + * @return the updated connector entity + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + ConnectorEntity cancelDrain(ConnectorEntity connectorEntity) throws NiFiClientException, IOException; + /** * Gets the configuration step names for a connector. * diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java index b5ad5878d5..4f9b8d8aaa 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java @@ -164,6 +164,75 @@ public class JerseyConnectorClient extends AbstractJerseyClient implements Conne connectorEntity.getRevision().getVersion(), connectorEntity.isDisconnectedNodeAcknowledged()); } + @Override + public ConnectorEntity drainConnector(final String connectorId, final String clientId, final long version) throws NiFiClientException, IOException { + return drainConnector(connectorId, clientId, version, false); + } + + @Override + public ConnectorEntity drainConnector(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException { + return drainConnector(connectorEntity.getId(), connectorEntity.getRevision().getClientId(), + connectorEntity.getRevision().getVersion(), connectorEntity.isDisconnectedNodeAcknowledged()); + } + + private ConnectorEntity drainConnector(final String connectorId, final String clientId, final long version, + final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector ID cannot be null or blank"); + } + + return executeAction("Error initiating connector drain", () -> { + final WebTarget target = connectorTarget + .path("/drain") + .resolveTemplate("id", connectorId); + + final ConnectorEntity requestEntity = new ConnectorEntity(); + requestEntity.setId(connectorId); + requestEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged); + + final RevisionDTO revisionDto = new RevisionDTO(); + revisionDto.setClientId(clientId); + revisionDto.setVersion(version); + requestEntity.setRevision(revisionDto); + + return getRequestBuilder(target).post( + Entity.entity(requestEntity, MediaType.APPLICATION_JSON_TYPE), + ConnectorEntity.class); + }); + } + + @Override + public ConnectorEntity cancelDrain(final String connectorId, final String clientId, final long version) throws NiFiClientException, IOException { + return cancelDrain(connectorId, clientId, version, false); + } + + @Override + public ConnectorEntity cancelDrain(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException { + return cancelDrain(connectorEntity.getId(), connectorEntity.getRevision().getClientId(), + connectorEntity.getRevision().getVersion(), connectorEntity.isDisconnectedNodeAcknowledged()); + } + + private ConnectorEntity cancelDrain(final String connectorId, final String clientId, final long version, + final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector ID cannot be null or blank"); + } + + return executeAction("Error canceling connector drain", () -> { + WebTarget target = connectorTarget + .path("/drain") + .queryParam("version", version) + .queryParam("clientId", clientId) + .resolveTemplate("id", connectorId); + + if (disconnectedNodeAcknowledged == Boolean.TRUE) { + target = target.queryParam("disconnectedNodeAcknowledged", "true"); + } + + return getRequestBuilder(target).delete(ConnectorEntity.class); + }); + } + private ConnectorEntity updateConnectorRunStatus(final String connectorId, final String desiredState, final String clientId, final long version, final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException { if (StringUtils.isBlank(connectorId)) {
