This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new b515661ba4 NIFI-15461: Added ability to initiate drainage of
Connector's FlowFiles and calce; added tests to verify; some bug fixes (#10767)
b515661ba4 is described below
commit b515661ba496bd304079a93a0e0a18b65aecd9c9
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 21 14:20:07 2026 -0500
NIFI-15461: Added ability to initiate drainage of Connector's FlowFiles and
calce; added tests to verify; some bug fixes (#10767)
---
.../http/StandardHttpResponseMapper.java | 2 +
.../endpoints/ConnectorStatusEndpointMerger.java | 86 ++++++++
.../ClusteredConnectorRequestReplicator.java | 22 +-
.../cluster/manager/ConnectorEntityMerger.java | 29 ++-
.../apache/nifi/cluster/manager/StatusMerger.java | 15 ++
.../cluster/manager/ConnectorEntityMergerTest.java | 30 +++
.../nifi/components/connector/ConnectorNode.java | 12 ++
.../connector/StandardConnectorNode.java | 83 +++++++-
.../connector/StandardConnectorRepository.java | 22 +-
.../components/connector/StandardFlowContext.java | 5 +
.../connector/TestStandardConnectorNode.java | 118 +++++++++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 8 +
.../apache/nifi/web/StandardNiFiServiceFacade.java | 57 ++++++
.../org/apache/nifi/web/api/ConnectorResource.java | 149 ++++++++++++++
.../java/org/apache/nifi/web/dao/ConnectorDAO.java | 6 +
.../nifi/web/dao/impl/StandardConnectorDAO.java | 18 ++
.../apache/nifi/web/api/TestConnectorResource.java | 64 +++++-
.../tests/system/DataQueuingConnector.java | 3 +-
...nnector.java => GatedDataQueuingConnector.java} | 71 ++++++-
.../processors/tests/system/TerminateFlowFile.java | 28 +++
.../org.apache.nifi.components.connector.Connector | 1 +
.../apache/nifi/tests/system/NiFiClientUtil.java | 24 +++
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 23 +++
.../connectors/ClusteredConnectorDrainIT.java | 228 +++++++++++++++++++++
.../tests/system/connectors/ConnectorDrainIT.java | 178 ++++++++++++++++
.../nifi/toolkit/client/ConnectorClient.java | 44 ++++
.../toolkit/client/impl/JerseyConnectorClient.java | 69 +++++++
27 files changed, 1357 insertions(+), 38 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index daafc89fbc..84711694cf 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -28,6 +28,7 @@ import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorEndpointMerg
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorFlowEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupNamesEndpointMerger;
+import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorStatusEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorsEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
@@ -146,6 +147,7 @@ public class StandardHttpResponseMapper implements
HttpResponseMapper {
endpointMergers.add(new ProcessorRunStatusDetailsEndpointMerger());
endpointMergers.add(new ConnectorEndpointMerger());
endpointMergers.add(new ConnectorsEndpointMerger());
+ endpointMergers.add(new ConnectorStatusEndpointMerger());
endpointMergers.add(new ConnectorFlowEndpointMerger());
endpointMergers.add(new ConnectorPropertyGroupEndpointMerger());
endpointMergers.add(new ConnectorPropertyGroupNamesEndpointMerger());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
new file mode 100644
index 0000000000..bff04b3fe7
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.ComponentEntityStatusMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ConnectorStatusEndpointMerger extends
AbstractSingleEntityEndpoint<ProcessGroupStatusEntity> implements
ComponentEntityStatusMerger<ProcessGroupStatusDTO> {
+ public static final Pattern CONNECTOR_STATUS_URI_PATTERN =
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/status");
+
+ @Override
+ public boolean canHandle(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) &&
CONNECTOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ @Override
+ protected Class<ProcessGroupStatusEntity> getEntityClass() {
+ return ProcessGroupStatusEntity.class;
+ }
+
+ @Override
+ protected void mergeResponses(final ProcessGroupStatusEntity clientEntity,
final Map<NodeIdentifier, ProcessGroupStatusEntity> entityMap,
+ final Set<NodeResponse> successfulResponses,
final Set<NodeResponse> problematicResponses) {
+ final ProcessGroupStatusDTO mergedProcessGroupStatus =
clientEntity.getProcessGroupStatus();
+ mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<>());
+
+ final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
+ .filter(e -> e.getValue() == clientEntity)
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .orElse(null);
+
+ final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new
NodeProcessGroupStatusSnapshotDTO();
+
selectedNodeSnapshot.setStatusSnapshot(mergedProcessGroupStatus.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ for (final Map.Entry<NodeIdentifier, ProcessGroupStatusEntity> entry :
entityMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ProcessGroupStatusEntity nodeProcessGroupStatusEntity =
entry.getValue();
+ final ProcessGroupStatusDTO nodeProcessGroupStatus =
nodeProcessGroupStatusEntity.getProcessGroupStatus();
+ if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+ continue;
+ }
+
+ mergeStatus(mergedProcessGroupStatus, clientEntity.getCanRead(),
nodeProcessGroupStatus, nodeProcessGroupStatusEntity.getCanRead(), nodeId);
+ }
+ }
+
+ @Override
+ public void mergeStatus(final ProcessGroupStatusDTO clientStatus, final
boolean clientStatusReadablePermission,
+ final ProcessGroupStatusDTO status, final boolean
statusReadablePermission,
+ final NodeIdentifier statusNodeIdentifier) {
+ StatusMerger.merge(clientStatus, clientStatusReadablePermission,
status, statusReadablePermission,
+ statusNodeIdentifier.getId(),
statusNodeIdentifier.getApiAddress(), statusNodeIdentifier.getApiPort());
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
index 8fdc58dd35..a3db59cf4f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
@@ -24,11 +24,13 @@ import jakarta.ws.rs.core.Response.StatusType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.connector.ConnectorRequestReplicator;
import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.Entity;
import java.io.IOException;
import java.net.URI;
@@ -55,14 +57,26 @@ public class ClusteredConnectorRequestReplicator implements
ConnectorRequestRepl
public ConnectorState getState(final String connectorId) throws
IOException {
final RequestReplicator requestReplicator = getRequestReplicator();
final NiFiUser nodeUser = getNodeUser();
- final AsyncClusterResponse asyncResponse =
requestReplicator.replicate(nodeUser, GET, URI.create(replicationScheme +
"://localhost/nifi-api/connectors/" + connectorId), Map.of(), Map.of());
+ final URI uri = URI.create(replicationScheme +
"://localhost/nifi-api/connectors/" + connectorId);
+ final AsyncClusterResponse asyncResponse =
requestReplicator.replicate(nodeUser, GET, uri, Map.of(), Map.of());
+
try {
- final Response response =
asyncResponse.awaitMergedResponse().getClientResponse();
+ final NodeResponse mergedNodeResponse =
asyncResponse.awaitMergedResponse();
+ final Response response = mergedNodeResponse.getClientResponse();
verifyResponse(response.getStatusInfo(), connectorId);
- final ConnectorEntity connectorEntity =
response.readEntity(ConnectorEntity.class);
- final String stateName = connectorEntity.getComponent().getState();
+ // Use the merged/updated entity if available, otherwise fall back
to reading from the raw response.
+ // The updatedEntity contains the properly merged state from all
nodes, while readEntity() would
+ // only return the state from whichever single node was selected
as the "client response".
+ final ConnectorEntity connectorEntity;
+ final Entity updatedEntity = mergedNodeResponse.getUpdatedEntity();
+ if (updatedEntity instanceof ConnectorEntity
mergedConnectorEntity) {
+ connectorEntity = mergedConnectorEntity;
+ } else {
+ connectorEntity = response.readEntity(ConnectorEntity.class);
+ }
+ final String stateName = connectorEntity.getComponent().getState();
try {
return ConnectorState.valueOf(stateName);
} catch (final IllegalArgumentException e) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
index 5f880dfc3f..7ef7fdf76b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.manager;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
@@ -26,7 +27,6 @@ import java.util.HashMap;
import java.util.Map;
public class ConnectorEntityMerger {
-
/**
* Merges the ConnectorEntity responses.
*
@@ -43,6 +43,16 @@ public class ConnectorEntityMerger {
}
mergeDtos(clientDto, dtoMap);
+
+ mergeStatus(clientEntity, entityMap);
+ }
+
+ private static void mergeStatus(final ConnectorEntity clientEntity, final
Map<NodeIdentifier, ConnectorEntity> entityMap) {
+ for (final ConnectorEntity nodeEntity : entityMap.values()) {
+ if (nodeEntity != clientEntity && nodeEntity != null) {
+ StatusMerger.merge(clientEntity.getStatus(),
nodeEntity.getStatus());
+ }
+ }
}
private static void mergeDtos(final ConnectorDTO clientDto, final
Map<NodeIdentifier, ConnectorDTO> dtoMap) {
@@ -89,13 +99,16 @@ public class ConnectorEntityMerger {
if (state == null) {
return 0;
}
- return switch (state) {
- case "UPDATE_FAILED" -> 6;
- case "PREPARING_FOR_UPDATE" -> 5;
- case "UPDATING" -> 4;
- case "UPDATED" -> 3;
- case "STARTING", "STOPPING" -> 2;
- default -> 1; // RUNNING, STOPPED, DISABLED
+
+ final ConnectorState connectorState = ConnectorState.valueOf(state);
+ return switch (connectorState) {
+ case UPDATE_FAILED -> 7;
+ case PREPARING_FOR_UPDATE -> 6;
+ case UPDATING -> 5;
+ case DRAINING, PURGING -> 4;
+ case UPDATED -> 3;
+ case STARTING, STOPPING -> 2;
+ default -> 1; // RUNNING, STOPPED
};
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index f238dc1049..5b403e95f8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -43,6 +43,7 @@ import
org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import
org.apache.nifi.web.api.dto.status.ConnectionStatusPredictionsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.FlowAnalysisRuleStatusDTO;
@@ -1084,4 +1085,18 @@ public class StatusMerger {
target.setValidationStatus(ValidationStatus.INVALID.name());
}
}
+
+ public static void merge(final ConnectorStatusDTO target, final
ConnectorStatusDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setActiveThreadCount(target.getActiveThreadCount() +
toMerge.getActiveThreadCount());
+
+ if
(ValidationStatus.VALIDATING.name().equalsIgnoreCase(toMerge.getValidationStatus()))
{
+ target.setValidationStatus(ValidationStatus.VALIDATING.name());
+ } else if
(ValidationStatus.INVALID.name().equalsIgnoreCase(toMerge.getValidationStatus()))
{
+ target.setValidationStatus(ValidationStatus.INVALID.name());
+ }
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
index da0ed151f2..21f351b7a9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
@@ -25,6 +25,7 @@ import
org.apache.nifi.web.api.dto.ConnectorPropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.PropertyGroupConfigurationDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.junit.jupiter.api.Test;
@@ -130,6 +131,22 @@ class ConnectorEntityMergerTest {
assertEquals(2, config.getConfigurationStepConfigurations().size());
}
+ @Test
+ void testMergeConnectorStatusValidationStatusPriority() {
+ final ConnectorEntity clientEntity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
+ final ConnectorEntity node1Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALIDATING");
+ final ConnectorEntity node2Entity =
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
+
+ final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+ entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
+ entityMap.put(getNodeIdentifier("node1", 8001), node1Entity);
+ entityMap.put(getNodeIdentifier("node2", 8002), node2Entity);
+
+ ConnectorEntityMerger.merge(clientEntity, entityMap);
+
+ assertEquals("VALIDATING",
clientEntity.getStatus().getValidationStatus());
+ }
+
private NodeIdentifier getNodeIdentifier(final String id, final int port) {
return new NodeIdentifier(id, "localhost", port, "localhost", port +
1, "localhost", port + 2, port + 3, true);
}
@@ -153,6 +170,19 @@ class ConnectorEntityMergerTest {
return entity;
}
+ private ConnectorEntity createConnectorEntityWithStatus(final String id,
final String state, final int activeThreadCount, final String validationStatus)
{
+ final ConnectorEntity entity = createConnectorEntity(id, state);
+
+ final ConnectorStatusDTO status = new ConnectorStatusDTO();
+ status.setId(id);
+ status.setRunStatus(state);
+ status.setActiveThreadCount(activeThreadCount);
+ status.setValidationStatus(validationStatus);
+ entity.setStatus(status);
+
+ return entity;
+ }
+
private ConnectorEntity createConnectorEntityWithConfig(final String id,
final String state,
final Map<String,
List<String>> propertyAllowableValues) {
final ConnectorEntity entity = createConnectorEntity(id, state);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index bb3bdf6e2d..8a39f4b66a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -199,6 +199,18 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
*/
Future<Void> drainFlowFiles();
+ /**
+ * Cancels the draining of FlowFiles that is currently in progress.
+ * @throws IllegalStateException if the Connector is not currently
draining FlowFiles
+ */
+ void cancelDrainFlowFiles();
+
+ /**
+ * Verifies that the Connector can cancel draining FlowFiles.
+ * @throws IllegalStateException if not in a state where draining
FlowFiles can be cancelled
+ */
+ void verifyCancelDrainFlowFiles() throws IllegalStateException;
+
/**
* Purges all FlowFiles from the Connector, immediately dropping the data.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index afb6e37bd9..471fbe10ae 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -41,9 +41,13 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedConfigurationStep;
import org.apache.nifi.flow.VersionedConnectorValueReference;
+import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
@@ -95,6 +99,7 @@ public class StandardConnectorNode implements ConnectorNode {
private final ConnectorValidationTrigger validationTrigger;
private final boolean extensionMissing;
private volatile boolean triggerValidation = true;
+ private final AtomicReference<CompletableFuture<Void>> drainFutureRef =
new AtomicReference<>();
private volatile FrameworkFlowContext workingFlowContext;
@@ -398,6 +403,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public Future<Void> stop(final FlowEngine scheduler) {
+ logger.info("Stopping {}", this);
final CompletableFuture<Void> stopCompleteFuture = new
CompletableFuture<>();
stateTransition.setDesiredState(ConnectorState.STOPPED);
@@ -406,7 +412,7 @@ public class StandardConnectorNode implements ConnectorNode
{
while (!stateUpdated) {
final ConnectorState currentState = getCurrentState();
if (currentState == ConnectorState.STOPPED) {
- logger.debug("{} is already {}; will not attempt to stop",
this, currentState);
+ logger.info("{} is already stopped.", this);
stopCompleteFuture.complete(null);
return stopCompleteFuture;
}
@@ -430,8 +436,24 @@ public class StandardConnectorNode implements
ConnectorNode {
requireStopped("drain FlowFiles", ConnectorState.DRAINING);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
connectorDetails.getConnector().getClass(), getIdentifier())) {
- final CompletableFuture<Void> future =
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
- final CompletableFuture<Void> stateUpdateFuture =
future.whenComplete((result, failureCause) ->
stateTransition.setCurrentState(ConnectorState.STOPPED));
+ getComponentLog().info("Draining FlowFiles from {}", this);
+ final CompletableFuture<Void> drainFuture =
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
+ drainFutureRef.set(drainFuture);
+
+ final CompletableFuture<Void> stateUpdateFuture =
drainFuture.whenComplete((result, failureCause) -> {
+ drainFutureRef.set(null);
+ logger.info("Successfully drained FlowFiles from {}; ensuring
all components are stopped.", this);
+
+ try {
+ connectorDetails.getConnector().stop(activeFlowContext);
+ } catch (final Exception e) {
+ logger.warn("Failed to stop {} after draining FlowFiles",
this, e);
+ }
+
+ stateTransition.setCurrentState(ConnectorState.STOPPED);
+ logger.info("All components of {} are now stopped after
draining FlowFiles.", this);
+ });
+
return stateUpdateFuture;
} catch (final Throwable t) {
stateTransition.setCurrentState(ConnectorState.STOPPED);
@@ -439,6 +461,30 @@ public class StandardConnectorNode implements
ConnectorNode {
}
}
+ @Override
+ public void cancelDrainFlowFiles() {
+ final Future<Void> future = this.drainFutureRef.getAndSet(null);
+ if (future == null) {
+ logger.debug("No active drain to cancel for {}; drain may have
already completed", this);
+ return;
+ }
+
+ future.cancel(true);
+ logger.info("Cancelled draining of FlowFiles for {}", this);
+ }
+
+ @Override
+ public void verifyCancelDrainFlowFiles() throws IllegalStateException {
+ final ConnectorState state = getCurrentState();
+
+ // Allow if we're currently draining or if we're stopped; if stopped
the cancel drain action will be a no-op
+ // but we don't want to throw an IllegalStateException in that case
because doing so would mean that if one
+ // node in the cluster is stopped while another is draining we cannot
cancel the drain.
+ if (state != ConnectorState.DRAINING && state !=
ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot cancel draining of
FlowFiles for " + this + " because its current state is " + state + "; it must
be DRAINING.");
+ }
+ }
+
@Override
public Future<Void> purgeFlowFiles(final String requestor) {
requireStopped("purge FlowFiles", ConnectorState.PURGING);
@@ -610,6 +656,9 @@ public class StandardConnectorNode implements ConnectorNode
{
logger.info("{} has no initial flow to load", this);
} else {
logger.info("Loading initial flow for {}", this);
+ // Update all RUNNING components to ENABLED before applying the
initial flow so that components
+ // are not started before being configured.
+ stopComponents(initialFlow.getFlowContents());
initializationContext.updateFlow(activeFlowContext, initialFlow);
}
@@ -617,6 +666,24 @@ public class StandardConnectorNode implements
ConnectorNode {
recreateWorkingFlowContext();
}
+ private void stopComponents(final VersionedProcessGroup group) {
+ for (final VersionedProcessor processor : group.getProcessors()) {
+ if (processor.getScheduledState() == ScheduledState.RUNNING) {
+ processor.setScheduledState(ScheduledState.ENABLED);
+ }
+ }
+
+ for (final VersionedControllerService service :
group.getControllerServices()) {
+ if (service.getScheduledState() == ScheduledState.RUNNING) {
+ service.setScheduledState(ScheduledState.ENABLED);
+ }
+ }
+
+ for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
+ stopComponents(childGroup);
+ }
+ }
+
private void recreateWorkingFlowContext() {
destroyWorkingContext();
workingFlowContext =
flowContextFactory.createWorkingFlowContext(identifier,
@@ -879,6 +946,7 @@ public class StandardConnectorNode implements ConnectorNode
{
actions.add(createDiscardWorkingConfigAction());
actions.add(createPurgeFlowFilesAction(stopped, dataQueued));
actions.add(createDrainFlowFilesAction(stopped, dataQueued));
+ actions.add(createCancelDrainFlowFilesAction(currentState ==
ConnectorState.DRAINING));
actions.add(createApplyUpdatesAction(currentState));
actions.add(createDeleteAction(stopped, dataQueued));
@@ -987,6 +1055,15 @@ public class StandardConnectorNode implements
ConnectorNode {
return new StandardConnectorAction(actionName, description, allowed,
reason);
}
+ private ConnectorAction createCancelDrainFlowFilesAction(final boolean
draining) {
+ if (draining) {
+ return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES",
"Cancel the ongoing drain of FlowFiles", true, null);
+ }
+
+ return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES", "Cancel
the ongoing drain of FlowFiles", false,
+ "Connector is not currently draining FlowFiles");
+ }
+
private ConnectorAction createApplyUpdatesAction(final ConnectorState
currentState) {
final boolean allowed;
final String reason;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index 34fe990aa1..02a8c59e52 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -107,11 +106,14 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
final ConnectorState initialDesiredState = connector.getDesiredState();
+ logger.info("Applying update to Connector {}", connector);
// Transition the connector's state to PREPARING_FOR_UPDATE before
starting the background process.
// This allows us to ensure that if we poll and see the state in the
same state it was in before that
// we know the update has already completed (successfully or
otherwise).
+ logger.debug("Transitioning {} to PREPARING_FOR_UPDATE state before
applying update", connector);
connector.transitionStateForUpdating();
+ logger.debug("{} is now in PREPARING_FOR_UPDATE state", connector);
// Update connector in a background thread. This will handle
transitioning the Connector state appropriately
// so that it's clear when the update has completed.
@@ -121,28 +123,24 @@ public class StandardConnectorRepository implements
ConnectorRepository {
private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
try {
// Perform whatever preparation is necessary for the update.
Default implementation is to stop the connector.
+ logger.debug("Preparing {} for update", connector);
connector.prepareForUpdate();
// Wait for Connector State to become UPDATING
+ logger.debug("Waiting for {} to transition to UPDATING state",
connector);
waitForState(connector, Set.of(ConnectorState.UPDATING),
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
// Apply the update to the connector.
+ logger.info("{} has now completed preparations for update;
applying update now", connector);
connector.applyUpdate();
+ logger.info("{} has successfully applied update", connector);
// Now that the update has been applied, save the flow so that the
updated configuration is persisted.
context.saveFlow();
- // Wait for Connector State to become UPDATED, or to revert to the
initial desired state because, depending upon timing,
- // other nodes may have already seen the transition to UPDATED and
moved the connector back to the initial desired state.
- final Set<ConnectorState> desirableStates = new HashSet<>();
- desirableStates.add(initialDesiredState);
- desirableStates.add(ConnectorState.UPDATED);
- if (initialDesiredState == ConnectorState.RUNNING) {
- desirableStates.add(ConnectorState.STARTING);
- } else if (initialDesiredState == ConnectorState.STOPPED) {
- desirableStates.add(ConnectorState.STOPPING);
- }
- waitForState(connector, desirableStates,
Set.of(ConnectorState.UPDATING));
+ // Wait for all nodes to complete the update.
+ waitForState(connector, Set.of(ConnectorState.UPDATED),
Set.of(ConnectorState.UPDATING));
+ logger.info("{} has successfully completed update on all nodes",
connector);
// If the initial desired state was RUNNING, start the connector
again. Otherwise, stop it.
// We don't simply leave it be as the prepareForUpdate / update
may have changed the state of some components.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
index 16ebbc3e39..70ff4e079d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
@@ -28,6 +28,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import java.util.Collections;
+import java.util.HashMap;
public class StandardFlowContext implements FrameworkFlowContext {
private final ProcessGroup managedProcessGroup;
@@ -76,6 +77,10 @@ public class StandardFlowContext implements
FrameworkFlowContext {
@Override
public void updateFlow(final VersionedExternalFlow versionedExternalFlow,
final AssetManager assetManager) throws FlowUpdateException {
+ if (versionedExternalFlow.getParameterContexts() == null) {
+ versionedExternalFlow.setParameterContexts(new HashMap<>());
+ }
+
final String parameterContextName =
managedProcessGroup.getParameterContext().getName();
updateParameterContext(versionedExternalFlow.getFlowContents(),
parameterContextName);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
index 4eba662d26..d306ac45aa 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -509,6 +510,33 @@ public class TestStandardConnectorNode {
assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
}
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ public void testCancelDrainFlowFilesInterruptsConnector() throws Exception
{
+ final BusyLoopDrainConnector busyLoopConnector = new
BusyLoopDrainConnector();
+ final StandardConnectorNode connectorNode =
createConnectorNode(busyLoopConnector);
+
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+ connectorNode.drainFlowFiles();
+
+ assertTrue(busyLoopConnector.awaitDrainStarted(2, TimeUnit.SECONDS));
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+ Thread.sleep(1000);
+
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+ assertFalse(busyLoopConnector.wasInterrupted());
+ assertFalse(busyLoopConnector.wasStopCalled());
+
+ connectorNode.cancelDrainFlowFiles();
+
+ assertTrue(busyLoopConnector.awaitDrainCompleted(2, TimeUnit.SECONDS));
+ assertTrue(busyLoopConnector.wasInterrupted());
+ assertTrue(busyLoopConnector.wasStopCalled());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ }
+
private StandardConnectorNode createConnectorNode() throws
FlowUpdateException {
final SleepingConnector sleepingConnector = new
SleepingConnector(Duration.ofMillis(1));
return createConnectorNode(sleepingConnector);
@@ -727,4 +755,94 @@ public class TestStandardConnectorNode {
}
}
+ /**
+ * Test connector that uses a busy loop for draining that never completes
naturally,
+ * but can be interrupted via cancellation.
+ */
+ private static class BusyLoopDrainConnector extends AbstractConnector {
+ private volatile boolean interrupted = false;
+ private volatile boolean stopCalled = false;
+ private final AtomicReference<Thread> drainThreadRef = new
AtomicReference<>();
+ private final CountDownLatch drainStartedLatch = new CountDownLatch(1);
+ private final CountDownLatch drainCompletedLatch = new
CountDownLatch(1);
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ return null;
+ }
+
+ @Override
+ public void prepareForUpdate(final FlowContext workingContext, final
FlowContext activeContext) {
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of();
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) {
+ }
+
+ @Override
+ protected void onStepConfigured(final String stepName, final
FlowContext workingContext) {
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final
String stepName, final Map<String, String> overrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public void stop(final FlowContext activeContext) {
+ stopCalled = true;
+ }
+
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
+ final Thread drainThread = new Thread(() -> {
+ drainStartedLatch.countDown();
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ // Busy loop that never completes naturally
+ }
+ } finally {
+ interrupted = Thread.currentThread().isInterrupted();
+ drainCompletedLatch.countDown();
+ }
+ });
+
+ drainThreadRef.set(drainThread);
+
+ future.whenComplete((result, throwable) -> {
+ final Thread thread = drainThreadRef.get();
+ if (thread != null) {
+ thread.interrupt();
+ }
+ });
+
+ drainThread.start();
+
+ return future;
+ }
+
+ public boolean awaitDrainStarted(final long timeout, final TimeUnit
unit) throws InterruptedException {
+ return drainStartedLatch.await(timeout, unit);
+ }
+
+ public boolean awaitDrainCompleted(final long timeout, final TimeUnit
unit) throws InterruptedException {
+ return drainCompletedLatch.await(timeout, unit);
+ }
+
+ public boolean wasInterrupted() {
+ return interrupted;
+ }
+
+ public boolean wasStopCalled() {
+ return stopCalled;
+ }
+ }
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index fba789287f..23701236b6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -210,6 +210,14 @@ public interface NiFiServiceFacade {
ConnectorEntity scheduleConnector(Revision revision, String id,
ScheduledState state);
+ void verifyDrainConnector(String id);
+
+ ConnectorEntity drainConnector(Revision revision, String id);
+
+ void verifyCancelConnectorDrain(String id);
+
+ ConnectorEntity cancelConnectorDrain(Revision revision, String id);
+
ConfigurationStepNamesEntity getConnectorConfigurationSteps(String id);
ConfigurationStepEntity getConnectorConfigurationStep(String id, String
configurationStepName);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 323630ad03..acf64ea171 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -81,6 +81,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.connector.Connector;
import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.connector.ConnectorUpdateContext;
import org.apache.nifi.components.connector.Secret;
import org.apache.nifi.components.connector.secrets.AuthorizableSecret;
@@ -3606,6 +3607,62 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
}
+ @Override
+ public void verifyDrainConnector(final String id) {
+ final ConnectorNode connector = connectorDAO.getConnector(id);
+ final ConnectorState currentState = connector.getCurrentState();
+ if (currentState != ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot drain FlowFiles for
Connector " + id + " because it is not currently stopped. Current state: " +
currentState);
+ }
+ }
+
+ @Override
+ public ConnectorEntity drainConnector(final Revision revision, final
String id) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ final RevisionUpdate<ConnectorDTO> snapshot =
revisionManager.updateRevision(claim, user, () -> {
+ connectorDAO.drainFlowFiles(id);
+ controllerFacade.save();
+
+ final ConnectorNode node = connectorDAO.getConnector(id);
+ final ConnectorDTO dto = dtoFactory.createConnectorDto(node);
+ final FlowModification lastMod = new
FlowModification(revision.incrementRevision(revision.getClientId()),
user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+
+ final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
+ final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
+ final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ }
+
+ @Override
+ public void verifyCancelConnectorDrain(final String id) {
+ connectorDAO.verifyCancelDrainFlowFile(id);
+ }
+
+ @Override
+ public ConnectorEntity cancelConnectorDrain(final Revision revision, final
String id) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ final RevisionUpdate<ConnectorDTO> snapshot =
revisionManager.updateRevision(claim, user, () -> {
+ connectorDAO.cancelDrainFlowFiles(id);
+ controllerFacade.save();
+
+ final ConnectorNode node = connectorDAO.getConnector(id);
+ final ConnectorDTO dto = dtoFactory.createConnectorDto(node);
+ final FlowModification lastMod = new
FlowModification(revision.incrementRevision(revision.getClientId()),
user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+
+ final ConnectorNode node =
connectorDAO.getConnector(snapshot.getComponent().getId());
+ final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(node);
+ final PermissionsDTO operatePermissions =
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
+ return entityFactory.createConnectorEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions,
operatePermissions);
+ }
+
@Override
public ConfigurationStepNamesEntity getConnectorConfigurationSteps(final
String id) {
final ConnectorNode node = connectorDAO.getConnector(id);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index cb5146afdb..f7cd6c50f0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -622,6 +622,155 @@ public class ConnectorResource extends
ApplicationResource {
);
}
+ /**
+ * Initiates the draining of FlowFiles for the specified connector.
+ *
+ * @param id The id of the connector to drain.
+ * @param requestConnectorEntity A connectorEntity containing the revision.
+ * @return A connectorEntity.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/drain")
+ @Operation(
+ summary = "Initiates draining of FlowFiles for a connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ConnectorEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ description = "This will initiate draining of FlowFiles for a
stopped connector. Draining allows the connector to process " +
+ "data that is currently in the flow but does not ingest
any additional data. The connector must be in a STOPPED state " +
+ "before draining can begin. Once initiated, the connector
will transition to a DRAINING state. Use the DELETE method " +
+ "on this endpoint to cancel an ongoing drain operation.",
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid} or
/operation/connectors/{uuid}")
+ }
+ )
+ public Response initiateDrain(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "The connector entity with revision.",
+ required = true
+ ) final ConnectorEntity requestConnectorEntity) {
+
+ if (requestConnectorEntity == null ||
requestConnectorEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Connector entity with revision
must be specified.");
+ }
+
+ if (requestConnectorEntity.getId() != null &&
!id.equals(requestConnectorEntity.getId())) {
+ throw new IllegalArgumentException(String.format("The connector id
(%s) in the request body does not equal the "
+ + "connector id of the requested resource (%s).",
requestConnectorEntity.getId(), id));
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST, requestConnectorEntity);
+ } else if (isDisconnectedFromCluster()) {
+
verifyDisconnectedNodeModification(requestConnectorEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ final Revision requestRevision = getRevision(requestConnectorEntity,
id);
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Authorizable connector = lookup.getConnector(id);
+ OperationAuthorizable.authorizeOperation(connector,
authorizer, user);
+ },
+ () -> serviceFacade.verifyDrainConnector(id),
+ (revision, connectorEntity) -> {
+ final ConnectorEntity entity =
serviceFacade.drainConnector(revision, id);
+ populateRemainingConnectorEntityContent(entity);
+
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
+ /**
+ * Cancels the draining of FlowFiles for the specified connector.
+ *
+ * @param version The revision is used to verify the client is working
with the latest version of the flow.
+ * @param clientId Optional client id. If the client id is not specified,
a new one will be generated. This value (whether specified or generated) is
included in the response.
+ * @param id The id of the connector to cancel draining for.
+ * @return A connectorEntity.
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/drain")
+ @Operation(
+ summary = "Cancels the draining of FlowFiles for a connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = ConnectorEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid} or
/operation/connectors/{uuid}")
+ }
+ )
+ public Response cancelDrain(
+ @Parameter(
+ description = "The revision is used to verify the client
is working with the latest version of the flow."
+ )
+ @QueryParam(VERSION) final LongParameter version,
+ @Parameter(
+ description = "If the client id is not specified, new one
will be generated. This value (whether specified or generated) is included in
the response."
+ )
+ @QueryParam(CLIENT_ID) final ClientIdParameter clientId,
+ @Parameter(
+ description = "Acknowledges that this node is disconnected
to allow for mutable requests to proceed."
+ )
+ @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false")
final Boolean disconnectedNodeAcknowledged,
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ } else if (isDisconnectedFromCluster()) {
+ verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(id);
+
+ final Revision requestRevision = new Revision(version == null ? null :
version.getLong(), clientId.getClientId(), id);
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Authorizable connector = lookup.getConnector(id);
+ OperationAuthorizable.authorizeOperation(connector,
authorizer, user);
+ },
+ () -> serviceFacade.verifyCancelConnectorDrain(id),
+ (revision, connectorEntity) -> {
+ final ConnectorEntity entity =
serviceFacade.cancelConnectorDrain(revision, id);
+ populateRemainingConnectorEntityContent(entity);
+
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
/**
* Gets the configuration step names for the specified connector.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index a987dde126..d094f18ca9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -45,6 +45,12 @@ public interface ConnectorDAO {
void stopConnector(String id);
+ void drainFlowFiles(String id);
+
+ void cancelDrainFlowFiles(String id);
+
+ void verifyCancelDrainFlowFile(String id);
+
void updateConnectorConfigurationStep(String id, String
configurationStepName, ConfigurationStepConfigurationDTO
configurationStepConfiguration);
void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index e55e6ceee0..19b58dd207 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -127,6 +127,24 @@ public class StandardConnectorDAO implements ConnectorDAO {
getConnectorRepository().stopConnector(connector);
}
+ @Override
+ public void drainFlowFiles(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ connector.drainFlowFiles();
+ }
+
+ @Override
+ public void cancelDrainFlowFiles(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ connector.cancelDrainFlowFiles();
+ }
+
+ @Override
+ public void verifyCancelDrainFlowFile(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ connector.verifyCancelDrainFlowFiles();
+ }
+
@Override
public void updateConnectorConfigurationStep(final String id, final String
configurationStepName, final ConfigurationStepConfigurationDTO
configurationStepDto) {
final ConnectorNode connector = getConnector(id);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
index c218d87d68..60702b8b23 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
@@ -31,8 +31,8 @@ import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AllowableValueDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
@@ -412,12 +412,72 @@ public class TestConnectorResource {
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
assertThrows(AccessDeniedException.class, () ->
-
connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID,
PROCESS_GROUP_ID, true, false, true));
+
connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID,
PROCESS_GROUP_ID, true, false, true));
verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
verify(serviceFacade,
never()).getConnectorControllerServices(anyString(), anyString(), eq(true),
eq(false), eq(true));
}
+ @Test
+ public void testInitiateDrain() {
+ final ConnectorEntity requestEntity = createConnectorEntity();
+ final ConnectorEntity responseEntity = createConnectorEntity();
+ responseEntity.getComponent().setState("DRAINING");
+
+ when(serviceFacade.drainConnector(any(Revision.class),
eq(CONNECTOR_ID))).thenReturn(responseEntity);
+
+ try (Response response = connectorResource.initiateDrain(CONNECTOR_ID,
requestEntity)) {
+ assertEquals(200, response.getStatus());
+ assertEquals(responseEntity, response.getEntity());
+ }
+
+ verify(serviceFacade).verifyDrainConnector(CONNECTOR_ID);
+ verify(serviceFacade).drainConnector(any(Revision.class),
eq(CONNECTOR_ID));
+ }
+
+ @Test
+ public void testInitiateDrainNotAuthorized() {
+ final ConnectorEntity requestEntity = createConnectorEntity();
+
+
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+ assertThrows(AccessDeniedException.class, () ->
+ connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+ verify(serviceFacade, never()).verifyDrainConnector(anyString());
+ verify(serviceFacade, never()).drainConnector(any(Revision.class),
anyString());
+ }
+
+ @Test
+ public void testInitiateDrainWithNullEntity() {
+ assertThrows(IllegalArgumentException.class, () ->
+ connectorResource.initiateDrain(CONNECTOR_ID, null));
+
+ verify(serviceFacade, never()).drainConnector(any(Revision.class),
anyString());
+ }
+
+ @Test
+ public void testInitiateDrainWithNullRevision() {
+ final ConnectorEntity requestEntity = createConnectorEntity();
+ requestEntity.setRevision(null);
+
+ assertThrows(IllegalArgumentException.class, () ->
+ connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+ verify(serviceFacade, never()).drainConnector(any(Revision.class),
anyString());
+ }
+
+ @Test
+ public void testInitiateDrainWithMismatchedId() {
+ final ConnectorEntity requestEntity = createConnectorEntity();
+ requestEntity.setId("different-id");
+
+ assertThrows(IllegalArgumentException.class, () ->
+ connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+ verify(serviceFacade, never()).drainConnector(any(Revision.class),
anyString());
+ }
+
private ConnectorEntity createConnectorEntity() {
final ConnectorEntity entity = new ConnectorEntity();
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
index f3503b7ed1..7666b11e38 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -50,7 +50,8 @@ public class DataQueuingConnector extends AbstractConnector {
bundle.setVersion("2.7.0-SNAPSHOT");
final VersionedProcessor generate = createVersionedProcessor("gen-1",
"1234", "GenerateFlowFile",
- "org.apache.nifi.processors.tests.system.GenerateFlowFile",
bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile",
bundle, Map.of("File Size", "1 KB"), ScheduledState.ENABLED);
+ generate.setSchedulingPeriod("100 millis");
final VersionedProcessor terminate =
createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
"org.apache.nifi.processors.tests.system.TerminateFlowFile",
bundle, Collections.emptyMap(), ScheduledState.DISABLED);
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
similarity index 65%
copy from
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
copy to
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
index f3503b7ed1..857e77234f 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
@@ -20,6 +20,10 @@ package org.apache.nifi.connectors.tests.system;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.connector.AbstractConnector;
import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.PropertyType;
import org.apache.nifi.components.connector.components.FlowContext;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ConnectableComponent;
@@ -30,16 +34,47 @@ import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public class DataQueuingConnector extends AbstractConnector {
+/**
+ * A connector that generates FlowFiles and allows controlling when the
TerminateFlowFile processor
+ * will process them via a gate file. If the gate file does not exist,
FlowFiles will queue up.
+ * When the gate file exists, the TerminateFlowFile processor will process the
queued FlowFiles.
+ * <p>
+ * The connector uses a configuration step to specify the gate file path. The
test must configure
+ * the connector with a gate file path and apply the update before starting
the connector.
+ */
+public class GatedDataQueuingConnector extends AbstractConnector {
+
+ private static final String TERMINATE_PROCESSOR_ID = "term-1";
+
+ static final ConnectorPropertyDescriptor GATE_FILE_PATH = new
ConnectorPropertyDescriptor.Builder()
+ .name("Gate File Path")
+ .description("The path to the gate file. When this file exists,
the TerminateFlowFile processor " +
+ "will process FlowFiles. When it does not exist, FlowFiles
will queue up.")
+ .required(true)
+ .type(PropertyType.STRING)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyGroup PROPERTY_GROUP = new
ConnectorPropertyGroup.Builder()
+ .name("Gate Configuration")
+ .addProperty(GATE_FILE_PATH)
+ .build();
+
+ private static final ConfigurationStep CONFIG_STEP = new
ConfigurationStep.Builder()
+ .name("Gate Configuration")
+ .propertyGroups(List.of(PROPERTY_GROUP))
+ .build();
+
@Override
protected void onStepConfigured(final String stepName, final FlowContext
workingContext) {
-
}
@Override
@@ -50,10 +85,13 @@ public class DataQueuingConnector extends AbstractConnector
{
bundle.setVersion("2.7.0-SNAPSHOT");
final VersionedProcessor generate = createVersionedProcessor("gen-1",
"1234", "GenerateFlowFile",
- "org.apache.nifi.processors.tests.system.GenerateFlowFile",
bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle,
+ Map.of("File Size", "1 KB"), ScheduledState.ENABLED);
+ generate.setSchedulingPeriod("100 millis");
- final VersionedProcessor terminate =
createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
- "org.apache.nifi.processors.tests.system.TerminateFlowFile",
bundle, Collections.emptyMap(), ScheduledState.DISABLED);
+ final VersionedProcessor terminate =
createVersionedProcessor(TERMINATE_PROCESSOR_ID, "1234", "TerminateFlowFile",
+ "org.apache.nifi.processors.tests.system.TerminateFlowFile",
bundle,
+ Collections.emptyMap(), ScheduledState.ENABLED);
final ConnectableComponent source = new ConnectableComponent();
source.setId(generate.getIdentifier());
@@ -80,7 +118,7 @@ public class DataQueuingConnector extends AbstractConnector {
connection.setzIndex(1L);
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
- rootGroup.setName("Data Queuing Connector");
+ rootGroup.setName("Gated Data Queuing Connector");
rootGroup.setIdentifier("1234");
rootGroup.setProcessors(Set.of(generate, terminate));
rootGroup.setConnections(Set.of(connection));
@@ -98,11 +136,28 @@ public class DataQueuingConnector extends
AbstractConnector {
@Override
public List<ConfigurationStep> getConfigurationSteps() {
- return List.of();
+ return List.of(CONFIG_STEP);
}
@Override
- public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) {
+ public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) throws FlowUpdateException {
+ final String gateFilePath =
workingFlowContext.getConfigurationContext().getProperty(CONFIG_STEP,
GATE_FILE_PATH).getValue();
+ if (gateFilePath == null) {
+ return;
+ }
+
+ final VersionedExternalFlow flow = getInitialFlow();
+ final VersionedProcessGroup rootGroup = flow.getFlowContents();
+
+ for (final VersionedProcessor processor : rootGroup.getProcessors()) {
+ if (TERMINATE_PROCESSOR_ID.equals(processor.getIdentifier())) {
+ final Map<String, String> properties = new
HashMap<>(processor.getProperties());
+ properties.put("Gate File", gateFilePath);
+ processor.setProperties(properties);
+ }
+ }
+
+ getInitializationContext().updateFlow(activeFlowContext, flow);
}
private VersionedProcessor createVersionedProcessor(final String
identifier, final String groupIdentifier, final String name,
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
index 734766fa37..9ab6fd71b1 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
@@ -16,15 +16,43 @@
*/
package org.apache.nifi.processors.tests.system;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.util.List;
public class TerminateFlowFile extends AbstractProcessor {
+
+ public static final PropertyDescriptor GATE_FILE = new
PropertyDescriptor.Builder()
+ .name("Gate File")
+ .description("An optional file path. If specified, the processor
will only process FlowFiles when this file exists. " +
+ "If the file does not exist, the processor will yield and
return without processing any data.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(GATE_FILE);
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final String gateFilePath = context.getProperty(GATE_FILE).getValue();
+ if (gateFilePath != null) {
+ final File gateFile = new File(gateFilePath);
+ if (!gateFile.exists()) {
+ context.yield();
+ return;
+ }
+ }
+
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index 34fe7c90ea..3885e4c079 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -16,5 +16,6 @@
org.apache.nifi.connectors.tests.system.NopConnector
org.apache.nifi.connectors.tests.system.AssetConnector
org.apache.nifi.connectors.tests.system.DataQueuingConnector
+org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
org.apache.nifi.connectors.tests.system.CalculateConnector
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 51272e8b8e..1a3b91c366 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -406,6 +406,16 @@ public class NiFiClientUtil {
}
}
+ public void startConnector(final ConnectorEntity connectorEntity) throws
NiFiClientException, IOException, InterruptedException {
+ startConnector(connectorEntity.getId());
+ }
+
+ public void startConnector(final String connectorId) throws
NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity entity =
getConnectorClient().getConnector(connectorId);
+ getConnectorClient().startConnector(entity);
+ waitForConnectorState(connectorId, ConnectorState.RUNNING);
+ }
+
public void stopConnector(final ConnectorEntity connectorEntity) throws
NiFiClientException, IOException, InterruptedException {
stopConnector(connectorEntity.getId());
}
@@ -437,6 +447,20 @@ public class NiFiClientUtil {
}
}
+ public ConnectorEntity drainConnector(final String connectorId) throws
NiFiClientException, IOException {
+ final ConnectorEntity entity =
getConnectorClient().getConnector(connectorId);
+ return getConnectorClient().drainConnector(entity);
+ }
+
+ public ConnectorEntity cancelDrain(final String connectorId) throws
NiFiClientException, IOException {
+ final ConnectorEntity entity =
getConnectorClient().getConnector(connectorId);
+ return getConnectorClient().cancelDrain(entity);
+ }
+
+ public void waitForConnectorDraining(final String connectorId) throws
NiFiClientException, IOException, InterruptedException {
+ waitForConnectorState(connectorId, ConnectorState.DRAINING);
+ }
+
public ParameterProviderEntity createParameterProvider(final String
simpleTypeName) throws NiFiClientException, IOException {
return
createParameterProvider(NiFiSystemIT.TEST_PARAM_PROVIDERS_PACKAGE + "." +
simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID,
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index b73814250f..93614725e7 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -468,6 +468,29 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
logger.info("Queue Count for Connection {} is now {}", connectionId,
queueSize);
}
+ protected void waitForConnectorMinQueueCount(final String connectorId,
final int minQueueSize) throws InterruptedException {
+ logger.info("Waiting for Queue Count of at least {} in Connector {}",
minQueueSize, connectorId);
+
+ waitFor(() -> {
+ try {
+ final ProcessGroupStatusEntity statusEntity =
getNifiClient().getConnectorClient().getStatus(connectorId, true);
+ final int currentSize =
statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued();
+ logEverySecond("Current Queue Size for Connector {} = {},
Waiting for at least {}", connectorId, currentSize, minQueueSize);
+ return currentSize >= minQueueSize;
+ } catch (final Exception e) {
+ logger.error("Failed to get connector queue count", e);
+ return false;
+ }
+ });
+
+ logger.info("Queue Count for Connector {} is now at least {}",
connectorId, minQueueSize);
+ }
+
+ protected int getConnectorQueuedFlowFileCount(final String connectorId)
throws NiFiClientException, IOException {
+ final ProcessGroupStatusEntity statusEntity =
getNifiClient().getConnectorClient().getStatus(connectorId, true);
+ return
statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued();
+ }
+
private void waitForQueueCountToMatch(final String connectionId, final
Predicate<Integer> test, final String queueSizeDescription) throws
InterruptedException {
waitFor(() -> {
final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
new file mode 100644
index 0000000000..9ee3fbd09b
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Cluster-specific tests for connector draining functionality.
+ * These tests verify that draining works correctly when nodes complete
draining at different times.
+ */
+public class ClusteredConnectorDrainIT extends NiFiSystemIT {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusteredConnectorDrainIT.class);
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+
+ /**
+ * Tests that when draining in a cluster:
+ * 1. Create gate file on Node 1 only - Node 1 finishes draining
(STOPPED), Node 2 still DRAINING
+ * 2. Aggregate state should be DRAINING (since at least one node is still
draining)
+ * 3. Create gate file on Node 2 - both nodes finish draining
+ * 4. Aggregate state should be STOPPED
+ */
+ @Test
+ public void testDrainWithNodeCompletingAtDifferentTimes() throws
NiFiClientException, IOException, InterruptedException {
+ final File node1InstanceDir =
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+ final File node2InstanceDir =
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+ final File node1GateFile = new File(node1InstanceDir, "gate-file.txt");
+ final File node2GateFile = new File(node2InstanceDir, "gate-file.txt");
+ node1GateFile.deleteOnExit();
+ node2GateFile.deleteOnExit();
+ deleteIfExists(node1GateFile);
+ deleteIfExists(node2GateFile);
+
+ logger.info("Creating GatedDataQueuingConnector");
+ final ConnectorEntity connector =
getClientUtil().createConnector("GatedDataQueuingConnector");
+ assertNotNull(connector);
+ final String connectorId = connector.getId();
+
+ final String gateFilePath = "./gate-file.txt";
+ logger.info("Configuring connector {} with gate file path: {}",
connectorId, gateFilePath);
+ getClientUtil().configureConnector(connector, "Gate Configuration",
Map.of("Gate File Path", gateFilePath));
+ getClientUtil().applyConnectorUpdate(connector);
+
+ logger.info("Starting connector {}", connectorId);
+ getClientUtil().startConnector(connectorId);
+
+ logger.info("Waiting for at least 5 FlowFiles to queue");
+ waitForConnectorMinQueueCount(connectorId, 5);
+
+ logger.info("Stopping connector {}", connectorId);
+ getClientUtil().stopConnector(connectorId);
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final int queuedCountBeforeDrain =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count before drain: {}",
queuedCountBeforeDrain);
+ assertTrue(queuedCountBeforeDrain > 0);
+
+ logger.info("Initiating drain for connector {}", connectorId);
+ getClientUtil().drainConnector(connectorId);
+
+ logger.info("Waiting for aggregate connector state to be DRAINING");
+ getClientUtil().waitForConnectorDraining(connectorId);
+
+ logger.info("Creating gate file for Node 1 only: {}",
node1GateFile.getAbsolutePath());
+ assertTrue(node1GateFile.createNewFile());
+
+ logger.info("Waiting for Node 1 to finish draining and become
STOPPED");
+ waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED);
+ waitForNodeConnectorState(2, connectorId, ConnectorState.DRAINING);
+
+ logger.info("Verifying aggregate state is still DRAINING (Node 2 still
draining)");
+ final ConnectorEntity aggregateConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ assertEquals(ConnectorState.DRAINING.name(),
aggregateConnector.getComponent().getState());
+
+ logger.info("Creating gate file for Node 2: {}",
node2GateFile.getAbsolutePath());
+ assertTrue(node2GateFile.createNewFile());
+
+ logger.info("Waiting for aggregate state to become STOPPED");
+ waitFor(() -> {
+ try {
+ final ConnectorEntity entity =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ return
ConnectorState.STOPPED.name().equals(entity.getComponent().getState());
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+
+ final ConnectorEntity finalConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Final aggregate connector state: {}",
finalConnector.getComponent().getState());
+ assertEquals(ConnectorState.STOPPED.name(),
finalConnector.getComponent().getState());
+
+ final int finalQueuedCount =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Final queued FlowFile count: {}", finalQueuedCount);
+ assertEquals(0, finalQueuedCount);
+
+ logger.info("testDrainWithNodeCompletingAtDifferentTimes completed
successfully");
+ }
+
+ /**
+ * Tests that when canceling drain in a cluster where one node has already
finished:
+ * 1. Create gate file on Node 1 only - Node 1 finishes draining
(STOPPED), Node 2 still DRAINING
+ * 2. Aggregate state should be DRAINING
+ * 3. Cancel drain - Node 2 should stop draining
+ * 4. Aggregate state should be STOPPED
+ * 5. Data should still be queued (from Node 2)
+ */
+ @Test
+ public void testCancelDrainWithOneNodeAlreadyComplete() throws
NiFiClientException, IOException, InterruptedException {
+ final File node1InstanceDir =
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+ final File node2InstanceDir =
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+ final File node1GateFile = new File(node1InstanceDir,
"gate-file-cancel.txt");
+ final File node2GateFile = new File(node2InstanceDir,
"gate-file-cancel.txt");
+ node1GateFile.deleteOnExit();
+ node2GateFile.deleteOnExit();
+ deleteIfExists(node1GateFile);
+ deleteIfExists(node2GateFile);
+
+ logger.info("Creating GatedDataQueuingConnector");
+ final ConnectorEntity connector =
getClientUtil().createConnector("GatedDataQueuingConnector");
+ assertNotNull(connector);
+ final String connectorId = connector.getId();
+
+ final String gateFilePath = "./gate-file-cancel.txt";
+ logger.info("Configuring connector {} with gate file path: {}",
connectorId, gateFilePath);
+ getClientUtil().configureConnector(connector, "Gate Configuration",
Map.of("Gate File Path", gateFilePath));
+ getClientUtil().applyConnectorUpdate(connector);
+
+ logger.info("Starting connector {}", connectorId);
+ getClientUtil().startConnector(connectorId);
+
+ logger.info("Waiting for at least 5 FlowFiles to queue");
+ waitForConnectorMinQueueCount(connectorId, 5);
+
+ logger.info("Stopping connector {}", connectorId);
+ getClientUtil().stopConnector(connectorId);
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final int queuedCountBeforeDrain =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count before drain: {}",
queuedCountBeforeDrain);
+ assertTrue(queuedCountBeforeDrain > 0);
+
+ logger.info("Initiating drain for connector {}", connectorId);
+ getClientUtil().drainConnector(connectorId);
+
+ logger.info("Waiting for aggregate connector state to be DRAINING");
+ getClientUtil().waitForConnectorDraining(connectorId);
+
+ logger.info("Creating gate file for Node 1 only: {}",
node1GateFile.getAbsolutePath());
+ assertTrue(node1GateFile.createNewFile());
+
+ logger.info("Waiting for Node 1 to finish draining and become
STOPPED");
+ waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED);
+
+ logger.info("Verifying aggregate state is still DRAINING (Node 2 still
draining)");
+ final ConnectorEntity aggregateBeforeCancel =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ assertEquals(ConnectorState.DRAINING.name(),
aggregateBeforeCancel.getComponent().getState());
+
+ logger.info("Canceling drain for connector {}", connectorId);
+ getClientUtil().cancelDrain(connectorId);
+
+ logger.info("Waiting for aggregate state to become STOPPED after
cancel");
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final ConnectorEntity finalConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Final aggregate connector state: {}",
finalConnector.getComponent().getState());
+ assertEquals(ConnectorState.STOPPED.name(),
finalConnector.getComponent().getState());
+
+ final int finalQueuedCount =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Final queued FlowFile count after cancel (should still
have data from Node 2): {}", finalQueuedCount);
+ assertTrue(finalQueuedCount > 0);
+
+ logger.info("testCancelDrainWithOneNodeAlreadyComplete completed
successfully");
+ }
+
+ private void waitForNodeConnectorState(final int nodeIndex, final String
connectorId, final ConnectorState expectedState) throws InterruptedException {
+ logger.info("Waiting for Node {} connector {} to reach state {}",
nodeIndex, connectorId, expectedState);
+ waitFor(() -> {
+ try {
+ switchClientToNode(nodeIndex);
+ final ConnectorEntity entity =
getNifiClient().getConnectorClient(DO_NOT_REPLICATE).getConnector(connectorId);
+ return
expectedState.name().equals(entity.getComponent().getState());
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+ logger.info("Node {} connector {} reached state {}", nodeIndex,
connectorId, expectedState);
+ }
+
+ private void deleteIfExists(final File file) {
+ if (file.exists()) {
+ assertTrue(file.delete());
+ }
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
new file mode 100644
index 0000000000..eef6c4bf97
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConnectorDrainIT extends NiFiSystemIT {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ConnectorDrainIT.class);
+
+ @Test
+ public void testDrainFlowFiles() throws NiFiClientException, IOException,
InterruptedException {
+ final File gateFile = new
File(getNiFiInstance().getInstanceDirectory(), "gate-file.txt");
+ gateFile.deleteOnExit();
+
+ if (gateFile.exists()) {
+ assertTrue(gateFile.delete());
+ }
+
+ logger.info("Creating GatedDataQueuingConnector");
+ final ConnectorEntity connector =
getClientUtil().createConnector("GatedDataQueuingConnector");
+ assertNotNull(connector);
+ final String connectorId = connector.getId();
+
+ logger.info("Configuring connector {} with gate file path: {}",
connectorId, gateFile.getAbsolutePath());
+ getClientUtil().configureConnector(connector, "Gate Configuration",
Map.of("Gate File Path", gateFile.getAbsolutePath()));
+ getClientUtil().applyConnectorUpdate(connector);
+
+ logger.info("Starting connector {}", connectorId);
+ getClientUtil().startConnector(connectorId);
+
+ logger.info("Waiting for at least 5 FlowFiles to queue");
+ waitForConnectorMinQueueCount(connectorId, 5);
+
+ logger.info("Stopping connector {}", connectorId);
+ getClientUtil().stopConnector(connectorId);
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final int queuedCountBeforeDrain =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count before drain: {}",
queuedCountBeforeDrain);
+ assertTrue(queuedCountBeforeDrain > 0);
+
+ logger.info("Initiating drain for connector {}", connectorId);
+ getClientUtil().drainConnector(connectorId);
+
+ logger.info("Waiting for connector to enter DRAINING state");
+ getClientUtil().waitForConnectorDraining(connectorId);
+
+ logger.info("Sleeping for 2 seconds to verify connector remains in
DRAINING state");
+ Thread.sleep(2000L);
+
+ ConnectorEntity drainingConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Connector state after 2 seconds: {}",
drainingConnector.getComponent().getState());
+ assertEquals(ConnectorState.DRAINING.name(),
drainingConnector.getComponent().getState());
+
+ final int queuedWhileDraining =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count while draining (gate file absent):
{}", queuedWhileDraining);
+ assertTrue(queuedWhileDraining > 0);
+
+ logger.info("Creating gate file to allow draining to complete: {}",
gateFile.getAbsolutePath());
+ assertTrue(gateFile.createNewFile());
+
+ logger.info("Waiting for connector to complete draining and return to
STOPPED state");
+ waitFor(() -> {
+ try {
+ final ConnectorEntity entity =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ return
ConnectorState.STOPPED.name().equals(entity.getComponent().getState());
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+
+ final ConnectorEntity finalConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Final connector state: {}",
finalConnector.getComponent().getState());
+ assertEquals(ConnectorState.STOPPED.name(),
finalConnector.getComponent().getState());
+
+ final int finalQueuedCount =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Final queued FlowFile count: {}", finalQueuedCount);
+ assertEquals(0, finalQueuedCount);
+
+ logger.info("testDrainFlowFiles completed successfully");
+ }
+
+ @Test
+ public void testCancelDrainFlowFiles() throws NiFiClientException,
IOException, InterruptedException {
+ final File gateFile = new
File(getNiFiInstance().getInstanceDirectory(), "gate-file-cancel.txt");
+ gateFile.deleteOnExit();
+
+ if (gateFile.exists()) {
+ assertTrue(gateFile.delete());
+ }
+
+ logger.info("Creating GatedDataQueuingConnector");
+ final ConnectorEntity connector =
getClientUtil().createConnector("GatedDataQueuingConnector");
+ assertNotNull(connector);
+ final String connectorId = connector.getId();
+
+ logger.info("Configuring connector {} with gate file path: {}",
connectorId, gateFile.getAbsolutePath());
+ getClientUtil().configureConnector(connector, "Gate Configuration",
Map.of("Gate File Path", gateFile.getAbsolutePath()));
+ getClientUtil().applyConnectorUpdate(connector);
+
+ logger.info("Starting connector {}", connectorId);
+ getClientUtil().startConnector(connectorId);
+
+ logger.info("Waiting for at least 5 FlowFiles to queue");
+ waitForConnectorMinQueueCount(connectorId, 5);
+
+ logger.info("Stopping connector {}", connectorId);
+ getClientUtil().stopConnector(connectorId);
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final int queuedCountBeforeDrain =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count before drain: {}",
queuedCountBeforeDrain);
+ assertTrue(queuedCountBeforeDrain > 0);
+
+ logger.info("Initiating drain for connector {}", connectorId);
+ getClientUtil().drainConnector(connectorId);
+
+ logger.info("Waiting for connector to enter DRAINING state");
+ getClientUtil().waitForConnectorDraining(connectorId);
+
+ logger.info("Sleeping for 2 seconds to verify connector remains in
DRAINING state");
+ Thread.sleep(2000L);
+
+ ConnectorEntity drainingConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Connector state after 2 seconds: {}",
drainingConnector.getComponent().getState());
+ assertEquals(ConnectorState.DRAINING.name(),
drainingConnector.getComponent().getState());
+
+ final int queuedCountWhileDraining =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count while draining: {}",
queuedCountWhileDraining);
+ assertTrue(queuedCountWhileDraining > 0);
+
+ logger.info("Canceling drain for connector {}", connectorId);
+ getClientUtil().cancelDrain(connectorId);
+
+ logger.info("Waiting for connector to return to STOPPED state after
cancel");
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ final ConnectorEntity finalConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ logger.info("Final connector state: {}",
finalConnector.getComponent().getState());
+ assertEquals(ConnectorState.STOPPED.name(),
finalConnector.getComponent().getState());
+
+ final int queuedCountAfterCancel =
getConnectorQueuedFlowFileCount(connectorId);
+ logger.info("Queued FlowFile count after cancel (should still have
data): {}", queuedCountAfterCancel);
+ assertTrue(queuedCountAfterCancel > 0);
+
+ logger.info("testCancelDrainFlowFiles completed successfully");
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index 1e95222225..76dfefaeaa 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -131,6 +131,50 @@ public interface ConnectorClient {
*/
ConnectorEntity stopConnector(ConnectorEntity connectorEntity) throws
NiFiClientException, IOException;
+ /**
+ * Initiates draining of FlowFiles for a connector.
+ *
+ * @param connectorId the connector ID
+ * @param clientId the client ID
+ * @param version the revision version
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity drainConnector(String connectorId, String clientId, long
version) throws NiFiClientException, IOException;
+
+ /**
+ * Initiates draining of FlowFiles for a connector using the information
from the entity.
+ *
+ * @param connectorEntity the connector entity to drain
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity drainConnector(ConnectorEntity connectorEntity) throws
NiFiClientException, IOException;
+
+ /**
+ * Cancels an ongoing drain operation for a connector.
+ *
+ * @param connectorId the connector ID
+ * @param clientId the client ID
+ * @param version the revision version
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity cancelDrain(String connectorId, String clientId, long
version) throws NiFiClientException, IOException;
+
+ /**
+ * Cancels an ongoing drain operation for a connector using the
information from the entity.
+ *
+ * @param connectorEntity the connector entity to cancel draining for
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity cancelDrain(ConnectorEntity connectorEntity) throws
NiFiClientException, IOException;
+
/**
* Gets the configuration step names for a connector.
*
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index b5ad5878d5..4f9b8d8aaa 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -164,6 +164,75 @@ public class JerseyConnectorClient extends
AbstractJerseyClient implements Conne
connectorEntity.getRevision().getVersion(),
connectorEntity.isDisconnectedNodeAcknowledged());
}
+ @Override
+ public ConnectorEntity drainConnector(final String connectorId, final
String clientId, final long version) throws NiFiClientException, IOException {
+ return drainConnector(connectorId, clientId, version, false);
+ }
+
+ @Override
+ public ConnectorEntity drainConnector(final ConnectorEntity
connectorEntity) throws NiFiClientException, IOException {
+ return drainConnector(connectorEntity.getId(),
connectorEntity.getRevision().getClientId(),
+ connectorEntity.getRevision().getVersion(),
connectorEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ private ConnectorEntity drainConnector(final String connectorId, final
String clientId, final long version,
+ final Boolean disconnectedNodeAcknowledged) throws
NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector ID cannot be null or
blank");
+ }
+
+ return executeAction("Error initiating connector drain", () -> {
+ final WebTarget target = connectorTarget
+ .path("/drain")
+ .resolveTemplate("id", connectorId);
+
+ final ConnectorEntity requestEntity = new ConnectorEntity();
+ requestEntity.setId(connectorId);
+
requestEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged);
+
+ final RevisionDTO revisionDto = new RevisionDTO();
+ revisionDto.setClientId(clientId);
+ revisionDto.setVersion(version);
+ requestEntity.setRevision(revisionDto);
+
+ return getRequestBuilder(target).post(
+ Entity.entity(requestEntity,
MediaType.APPLICATION_JSON_TYPE),
+ ConnectorEntity.class);
+ });
+ }
+
+ @Override
+ public ConnectorEntity cancelDrain(final String connectorId, final String
clientId, final long version) throws NiFiClientException, IOException {
+ return cancelDrain(connectorId, clientId, version, false);
+ }
+
+ @Override
+ public ConnectorEntity cancelDrain(final ConnectorEntity connectorEntity)
throws NiFiClientException, IOException {
+ return cancelDrain(connectorEntity.getId(),
connectorEntity.getRevision().getClientId(),
+ connectorEntity.getRevision().getVersion(),
connectorEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ private ConnectorEntity cancelDrain(final String connectorId, final String
clientId, final long version,
+ final Boolean disconnectedNodeAcknowledged) throws
NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector ID cannot be null or
blank");
+ }
+
+ return executeAction("Error canceling connector drain", () -> {
+ WebTarget target = connectorTarget
+ .path("/drain")
+ .queryParam("version", version)
+ .queryParam("clientId", clientId)
+ .resolveTemplate("id", connectorId);
+
+ if (disconnectedNodeAcknowledged == Boolean.TRUE) {
+ target = target.queryParam("disconnectedNodeAcknowledged",
"true");
+ }
+
+ return getRequestBuilder(target).delete(ConnectorEntity.class);
+ });
+ }
+
private ConnectorEntity updateConnectorRunStatus(final String connectorId,
final String desiredState, final String clientId,
final long version, final Boolean disconnectedNodeAcknowledged)
throws NiFiClientException, IOException {
if (StringUtils.isBlank(connectorId)) {