This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new bb8aa894f1 NIFI-15440: Implementation of ConnectorActions (#10748)
bb8aa894f1 is described below
commit bb8aa894f199051fa133f89ba198870b3b522b9d
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jan 12 14:14:08 2026 -0500
NIFI-15440: Implementation of ConnectorActions (#10748)
* NIFI-15440: Implementation of ConnectorActions
* NIFI-15440: Addressed review feedback
* NIFI-15440: Addressed issues that occurred after rebase
* NIFI-15440: Addressed review feedback
---
.../nifi/web/api/dto/ConnectorActionDTO.java | 70 +++++++++
.../org/apache/nifi/web/api/dto/ConnectorDTO.java | 11 ++
.../nifi/components/connector/ConnectorAction.java | 49 ++++++
.../nifi/components/connector/ConnectorNode.java | 7 +
.../connector/StandardConnectorAction.java | 81 ++++++++++
.../connector/StandardConnectorNode.java | 170 +++++++++++++++++++++
.../StandaloneProcessGroupLifecycle.java | 16 ++
.../connector/StandardConnectorNodeIT.java | 1 +
.../org/apache/nifi/web/api/dto/DtoFactory.java | 17 +++
.../AuthorizingProcessGroupLifecycle.java | 6 +
.../tests/system/DataQueuingConnector.java | 69 +++++++--
11 files changed, 483 insertions(+), 14 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java
new file mode 100644
index 0000000000..97e27fbfcb
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.xml.bind.annotation.XmlType;
+
+/**
+ * Represents an action that can be performed on a Connector.
+ */
+@XmlType(name = "connectorAction")
+public class ConnectorActionDTO {
+
+ private String name;
+ private String description;
+ private Boolean allowed;
+ private String reasonNotAllowed;
+
+ @Schema(description = "The name of the action.")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ @Schema(description = "A description of what this action does.")
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(final String description) {
+ this.description = description;
+ }
+
+ @Schema(description = "Whether this action is currently allowed to be
performed, based on the state of the Connector. " +
+ "Note that a value of 'true' does not imply that the
user has permission to perform the action.")
+ public Boolean getAllowed() {
+ return allowed;
+ }
+
+ public void setAllowed(final Boolean allowed) {
+ this.allowed = allowed;
+ }
+
+ @Schema(description = "The reason why this action is not allowed, or null
if the action is allowed.")
+ public String getReasonNotAllowed() {
+ return reasonNotAllowed;
+ }
+
+ public void setReasonNotAllowed(final String reasonNotAllowed) {
+ this.reasonNotAllowed = reasonNotAllowed;
+ }
+}
+
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java
index d2d08590a3..ee891b10af 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java
@@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;
import java.util.Collection;
+import java.util.List;
/**
* Component representing a Connector instance.
@@ -41,6 +42,7 @@ public class ConnectorDTO extends ComponentDTO {
private String configurationUrl;
private String detailsUrl;
+ private List<ConnectorActionDTO> availableActions;
@Schema(description = "The name of the Connector.")
public String getName() {
@@ -158,4 +160,13 @@ public class ConnectorDTO extends ComponentDTO {
public void setExtensionMissing(final Boolean extensionMissing) {
this.extensionMissing = extensionMissing;
}
+
+ @Schema(description = "The available actions that can be performed on this
Connector.")
+ public List<ConnectorActionDTO> getAvailableActions() {
+ return availableActions;
+ }
+
+ public void setAvailableActions(final List<ConnectorActionDTO>
availableActions) {
+ this.availableActions = availableActions;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java
new file mode 100644
index 0000000000..60a4865269
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+/**
+ * Represents an action that can be performed on a Connector.
+ */
+public interface ConnectorAction {
+
+ /**
+ * Returns the name of this action.
+ * @return the action name
+ */
+ String getName();
+
+ /**
+ * Returns a description of what this action does.
+ * @return the action description
+ */
+ String getDescription();
+
+ /**
+ * Returns whether this action is currently allowed to be performed.
+ * @return true if the action is allowed, false otherwise
+ */
+ boolean isAllowed();
+
+ /**
+ * Returns the reason why this action is not allowed, or null if the
action is allowed.
+ * @return the reason the action is not allowed, or null if allowed
+ */
+ String getReasonNotAllowed();
+}
+
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 146be70d73..bb3bdf6e2d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -244,4 +244,11 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
*/
void inheritConfiguration(List<VersionedConfigurationStep>
activeFlowConfiguration, List<VersionedConfigurationStep>
workingFlowConfiguration,
Bundle flowContextBundle) throws FlowUpdateException;
+
+ /**
+ * Returns the list of available actions that can be performed on this
Connector.
+ * Each action includes whether it is currently allowed and, if not, the
reason why.
+ * @return the list of available actions
+ */
+ List<ConnectorAction> getAvailableActions();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java
new file mode 100644
index 0000000000..3989bbe02c
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import java.util.Objects;
+
+public class StandardConnectorAction implements ConnectorAction {
+
+ private final String name;
+ private final String description;
+ private final boolean allowed;
+ private final String reasonNotAllowed;
+
+ public StandardConnectorAction(final String name, final String
description, final boolean allowed, final String reasonNotAllowed) {
+ this.name = Objects.requireNonNull(name, "name is required");
+ this.description = Objects.requireNonNull(description, "description is
required");
+ this.allowed = allowed;
+ this.reasonNotAllowed = reasonNotAllowed;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public boolean isAllowed() {
+ return allowed;
+ }
+
+ @Override
+ public String getReasonNotAllowed() {
+ return reasonNotAllowed;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ final StandardConnectorAction that = (StandardConnectorAction) other;
+ return allowed == that.allowed
+ && Objects.equals(name, that.name)
+ && Objects.equals(description, that.description)
+ && Objects.equals(reasonNotAllowed, that.reasonNotAllowed);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, description, allowed, reasonNotAllowed);
+ }
+
+ @Override
+ public String toString() {
+ if (reasonNotAllowed == null) {
+ return "StandardConnectorAction[name=" + name + ", allowed=" +
allowed + "]";
+ }
+ return "StandardConnectorAction[name=" + name + ", allowed=" + allowed
+ ", reason=" + reasonNotAllowed + "]";
+ }
+}
+
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 5edd2d3fb1..2bb5793432 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -35,6 +35,7 @@ import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.FlowFileActivity;
import org.apache.nifi.connectable.FlowFileTransferCounts;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
@@ -866,6 +867,175 @@ public class StandardConnectorNode implements
ConnectorNode {
recreateWorkingFlowContext();
}
+ @Override
+ public List<ConnectorAction> getAvailableActions() {
+ final List<ConnectorAction> actions = new ArrayList<>();
+ final ConnectorState currentState = getCurrentState();
+ final boolean dataQueued =
activeFlowContext.getManagedProcessGroup().isDataQueued();
+ final boolean stopped = isStopped();
+
+ actions.add(createStartAction(stopped));
+ actions.add(createStopAction(currentState));
+ actions.add(createConfigureAction());
+ actions.add(createDiscardWorkingConfigAction());
+ actions.add(createPurgeFlowFilesAction(stopped, dataQueued));
+ actions.add(createDrainFlowFilesAction(stopped, dataQueued));
+ actions.add(createApplyUpdatesAction(currentState));
+ actions.add(createDeleteAction(stopped, dataQueued));
+
+ return actions;
+ }
+
+ private boolean isStopped() {
+ final ConnectorState currentState = getCurrentState();
+ if (currentState == ConnectorState.STOPPED) {
+ return true;
+ }
+ if (currentState == ConnectorState.UPDATED || currentState ==
ConnectorState.UPDATE_FAILED) {
+ return
!hasActiveThread(getActiveFlowContext().getManagedProcessGroup());
+ }
+ return false;
+ }
+
+ private ConnectorAction createStartAction(final boolean stopped) {
+ final boolean allowed;
+ final String reason;
+
+ if (!stopped) {
+ allowed = false;
+ reason = "Connector is not stopped";
+ } else {
+ allowed = true;
+ reason = null;
+ }
+
+ return new StandardConnectorAction("START", "Start the connector",
allowed, reason);
+ }
+
+ private ConnectorAction createStopAction(final ConnectorState
currentState) {
+ final boolean allowed;
+ if (currentState == ConnectorState.RUNNING || currentState ==
ConnectorState.STARTING) {
+ allowed = true;
+ } else if (currentState == ConnectorState.UPDATED || currentState ==
ConnectorState.UPDATE_FAILED) {
+ allowed =
hasActiveThread(activeFlowContext.getManagedProcessGroup());
+ } else {
+ allowed = false;
+ }
+
+ final String reason = allowed ? null : "Connector is not running";
+ return new StandardConnectorAction("STOP", "Stop the connector",
allowed, reason);
+ }
+
+ private ConnectorAction createConfigureAction() {
+ return new StandardConnectorAction("CONFIGURE", "Configure the
connector", true, null);
+ }
+
+ private ConnectorAction createDiscardWorkingConfigAction() {
+ final boolean allowed = hasWorkingConfigurationChanges();
+ final String reason = allowed ? null : "No pending changes to discard";
+
+ return new StandardConnectorAction("DISCARD_WORKING_CONFIGURATION",
"Discard any changes made to the working configuration", allowed, reason);
+ }
+
+ private boolean hasActiveThread(final ProcessGroup group) {
+ for (final ProcessorNode processor : group.getProcessors()) {
+ if (processor.getActiveThreadCount() > 0) {
+ return true;
+ }
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ if (hasActiveThread(childGroup)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private ConnectorAction createPurgeFlowFilesAction(final boolean stopped,
final boolean dataQueued) {
+ final boolean allowed;
+ final String reason;
+
+ if (!stopped) {
+ allowed = false;
+ reason = "Connector must be stopped";
+ } else if (!dataQueued) {
+ allowed = false;
+ reason = "No data is queued";
+ } else {
+ allowed = true;
+ reason = null;
+ }
+
+ return new StandardConnectorAction("PURGE_FLOWFILES", "Purge all
FlowFiles from the connector, dropping all data without processing it",
allowed, reason);
+ }
+
+ private ConnectorAction createDrainFlowFilesAction(final boolean stopped,
final boolean dataQueued) {
+ final boolean allowed;
+ final String reason;
+
+ if (!stopped) {
+ allowed = false;
+ reason = "Connector must be stopped";
+ } else if (!dataQueued) {
+ allowed = false;
+ reason = "No data is queued";
+ } else {
+ allowed = true;
+ reason = null;
+ }
+
+ return new StandardConnectorAction("DRAIN_FLOWFILES", "Process data
that is currently in the flow but do not ingest any additional data", allowed,
reason);
+ }
+
+ private ConnectorAction createApplyUpdatesAction(final ConnectorState
currentState) {
+ final boolean allowed;
+ final String reason;
+
+ if (currentState == ConnectorState.PREPARING_FOR_UPDATE ||
currentState == ConnectorState.UPDATING) {
+ allowed = false;
+ reason = "Connector is updating";
+ } else if (!hasWorkingConfigurationChanges()) {
+ allowed = false;
+ reason = "No pending changes";
+ } else {
+ allowed = true;
+ reason = null;
+ }
+
+ return new StandardConnectorAction("APPLY_UPDATES", "Apply the working
configuration to the active configuration", allowed, reason);
+ }
+
+ private ConnectorAction createDeleteAction(final boolean stopped, final
boolean dataQueued) {
+ final boolean allowed;
+ final String reason;
+
+ if (!stopped) {
+ allowed = false;
+ reason = "Connector must be stopped";
+ } else if (dataQueued) {
+ allowed = false;
+ reason = "Data is queued";
+ } else {
+ allowed = true;
+ reason = null;
+ }
+
+ return new StandardConnectorAction("DELETE", "Delete the connector",
allowed, reason);
+ }
+
+ private boolean hasWorkingConfigurationChanges() {
+ final FrameworkFlowContext workingContext = this.workingFlowContext;
+ if (workingContext == null) {
+ return false;
+ }
+
+ final ConnectorConfiguration activeConfig =
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
+ final ConnectorConfiguration workingConfig =
workingContext.getConfigurationContext().toConnectorConfiguration();
+ return !Objects.equals(activeConfig, workingConfig);
+ }
+
@Override
public Authorizable getParentAuthorizable() {
return parentAuthorizable;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
index 29d5baa8b3..95e7f8995d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
@@ -291,4 +291,20 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
return CompletableFuture.allOf(stopFutures.toArray(new
CompletableFuture[0]));
}
+
+ @Override
+ public int getActiveThreadCount() {
+ return getActiveThreadCount(processGroup);
+ }
+
+ private int getActiveThreadCount(final ProcessGroup group) {
+ int total = 0;
+ for (final ProcessorNode processor : group.getProcessors()) {
+ total += processor.getActiveThreadCount();
+ }
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ total += getActiveThreadCount(childGroup);
+ }
+ return total;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index f8e63e2723..336d5a20fb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -159,6 +159,7 @@ public class StandardConnectorNodeIT {
when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class));
when(flowController.getConnectorRepository()).thenReturn(connectorRepository);
when(flowController.getValidationTrigger()).thenReturn(mock(ValidationTrigger.class));
+
when(flowController.getConnectorValidationTrigger()).thenReturn(mock(ConnectorValidationTrigger.class));
doAnswer(invocation -> {
return createConnection(invocation.getArgument(0),
invocation.getArgument(1), invocation.getArgument(2),
invocation.getArgument(3), invocation.getArgument(4));
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index cf889dfc2c..231d313886 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -68,6 +68,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorAction;
import org.apache.nifi.components.connector.ConnectorAssetRepository;
import org.apache.nifi.components.connector.ConnectorConfiguration;
import org.apache.nifi.components.connector.ConnectorNode;
@@ -5266,10 +5267,26 @@ public final class DtoFactory {
dto.setManagedProcessGroupId(activeFlowContext.getManagedProcessGroup().getIdentifier());
dto.setActiveConfiguration(createConnectorConfigurationDtoFromFlowContext(connector,
activeFlowContext));
dto.setWorkingConfiguration(createConnectorConfigurationDtoFromFlowContext(connector,
connector.getWorkingFlowContext()));
+ dto.setAvailableActions(createConnectorActionDtos(connector));
return dto;
}
+ private List<ConnectorActionDTO> createConnectorActionDtos(final
ConnectorNode connector) {
+ return connector.getAvailableActions().stream()
+ .map(this::createConnectorActionDto)
+ .collect(Collectors.toList());
+ }
+
+ private ConnectorActionDTO createConnectorActionDto(final ConnectorAction
action) {
+ final ConnectorActionDTO dto = new ConnectorActionDTO();
+ dto.setName(action.getName());
+ dto.setDescription(action.getDescription());
+ dto.setAllowed(action.isAllowed());
+ dto.setReasonNotAllowed(action.getReasonNotAllowed());
+ return dto;
+ }
+
private ConnectorConfigurationDTO
createConnectorConfigurationDtoFromFlowContext(final ConnectorNode connector,
final FrameworkFlowContext flowContext) {
final List<ConfigurationStep> configurationSteps =
connector.getConfigurationSteps();
if (configurationSteps == null || configurationSteps.isEmpty()) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
index d29a056425..94f822d724 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
@@ -84,5 +84,11 @@ public class AuthorizingProcessGroupLifecycle implements
ProcessGroupLifecycle {
authContext.authorizeWrite();
return delegate.stopProcessors();
}
+
+ @Override
+ public int getActiveThreadCount() {
+ authContext.authorizeRead();
+ return delegate.getActiveThreadCount();
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
index de6701159d..f3503b7ed1 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -21,14 +21,17 @@ import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.connector.AbstractConnector;
import org.apache.nifi.components.connector.ConfigurationStep;
import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,19 +44,16 @@ public class DataQueuingConnector extends AbstractConnector
{
@Override
public VersionedExternalFlow getInitialFlow() {
- final VersionedProcessor generate = new VersionedProcessor();
- generate.setName("GenerateFlowFile");
-
generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile");
- generate.setIdentifier("gen-1");
- generate.setGroupIdentifier("1234");
- generate.setProperties(Map.of("File Size", "1 KB"));
-
- final VersionedProcessor terminate = new VersionedProcessor();
- terminate.setName("TerminateFlowFile");
-
terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile");
- terminate.setIdentifier("term-1");
- terminate.setGroupIdentifier("1234");
- terminate.setScheduledState(ScheduledState.DISABLED);
+ final Bundle bundle = new Bundle();
+ bundle.setGroup("org.apache.nifi");
+ bundle.setArtifact("nifi-system-test-extensions-nar");
+ bundle.setVersion("2.7.0-SNAPSHOT");
+
+ final VersionedProcessor generate = createVersionedProcessor("gen-1",
"1234", "GenerateFlowFile",
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile",
bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
+
+ final VersionedProcessor terminate =
createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
+ "org.apache.nifi.processors.tests.system.TerminateFlowFile",
bundle, Collections.emptyMap(), ScheduledState.DISABLED);
final ConnectableComponent source = new ConnectableComponent();
source.setId(generate.getIdentifier());
@@ -66,10 +66,18 @@ public class DataQueuingConnector extends AbstractConnector
{
destination.setGroupId("1234");
final VersionedConnection connection = new VersionedConnection();
+ connection.setIdentifier("generate-to-terminate-1");
connection.setSource(source);
connection.setDestination(destination);
connection.setGroupIdentifier("1234");
- connection.setIdentifier("generate-to-terminate-1");
+ connection.setSelectedRelationships(Set.of("success"));
+ connection.setBackPressureDataSizeThreshold("1 GB");
+ connection.setBackPressureObjectThreshold(10_000L);
+ connection.setBends(Collections.emptyList());
+ connection.setLabelIndex(1);
+ connection.setFlowFileExpiration("0 sec");
+ connection.setPrioritizers(Collections.emptyList());
+ connection.setzIndex(1L);
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
rootGroup.setName("Data Queuing Connector");
@@ -79,6 +87,7 @@ public class DataQueuingConnector extends AbstractConnector {
final VersionedExternalFlow flow = new VersionedExternalFlow();
flow.setFlowContents(rootGroup);
+ flow.setParameterContexts(Collections.emptyMap());
return flow;
}
@@ -95,4 +104,36 @@ public class DataQueuingConnector extends AbstractConnector
{
@Override
public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) {
}
+
+ private VersionedProcessor createVersionedProcessor(final String
identifier, final String groupIdentifier, final String name,
+ final String type,
final Bundle bundle, final Map<String, String> properties,
+ final ScheduledState
scheduledState) {
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier(identifier);
+ processor.setGroupIdentifier(groupIdentifier);
+ processor.setName(name);
+ processor.setType(type);
+ processor.setBundle(bundle);
+ processor.setProperties(properties);
+ processor.setPropertyDescriptors(Collections.emptyMap());
+ processor.setScheduledState(scheduledState);
+
+ processor.setBulletinLevel("WARN");
+ processor.setSchedulingStrategy("TIMER_DRIVEN");
+ processor.setSchedulingPeriod("0 sec");
+ processor.setExecutionNode("ALL");
+ processor.setConcurrentlySchedulableTaskCount(1);
+ processor.setPenaltyDuration("30 sec");
+ processor.setYieldDuration("1 sec");
+ processor.setRunDurationMillis(0L);
+ processor.setPosition(new Position(0, 0));
+
+ processor.setAutoTerminatedRelationships(Collections.emptySet());
+ processor.setRetryCount(10);
+ processor.setRetriedRelationships(Collections.emptySet());
+ processor.setBackoffMechanism("PENALIZE_FLOWFILE");
+ processor.setMaxBackoffPeriod("10 mins");
+
+ return processor;
+ }
}