This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8332e6db55988866dcd4d96028bab32da7b8d3c7 Author: Mark Payne <[email protected]> AuthorDate: Mon Jan 12 14:14:08 2026 -0500 NIFI-15440: Implementation of ConnectorActions (#10748) * NIFI-15440: Implementation of ConnectorActions * NIFI-15440: Addressed review feedback * NIFI-15440: Addressed issues that occurred after rebase * NIFI-15440: Addressed review feedback --- .../nifi/web/api/dto/ConnectorActionDTO.java | 70 +++++++++ .../org/apache/nifi/web/api/dto/ConnectorDTO.java | 11 ++ .../nifi/components/connector/ConnectorAction.java | 49 ++++++ .../nifi/components/connector/ConnectorNode.java | 7 + .../connector/StandardConnectorAction.java | 81 ++++++++++ .../connector/StandardConnectorNode.java | 170 +++++++++++++++++++++ .../StandaloneProcessGroupLifecycle.java | 16 ++ .../connector/StandardConnectorNodeIT.java | 1 + .../org/apache/nifi/web/api/dto/DtoFactory.java | 17 +++ .../AuthorizingProcessGroupLifecycle.java | 6 + .../tests/system/DataQueuingConnector.java | 69 +++++++-- 11 files changed, 483 insertions(+), 14 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java new file mode 100644 index 0000000000..97e27fbfcb --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorActionDTO.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; + +/** + * Represents an action that can be performed on a Connector. + */ +@XmlType(name = "connectorAction") +public class ConnectorActionDTO { + + private String name; + private String description; + private Boolean allowed; + private String reasonNotAllowed; + + @Schema(description = "The name of the action.") + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + @Schema(description = "A description of what this action does.") + public String getDescription() { + return description; + } + + public void setDescription(final String description) { + this.description = description; + } + + @Schema(description = "Whether this action is currently allowed to be performed, based on the state of the Connector. " + + "Note that a value of 'true' does not imply that the user has permission to perform the action.") + public Boolean getAllowed() { + return allowed; + } + + public void setAllowed(final Boolean allowed) { + this.allowed = allowed; + } + + @Schema(description = "The reason why this action is not allowed, or null if the action is allowed.") + public String getReasonNotAllowed() { + return reasonNotAllowed; + } + + public void setReasonNotAllowed(final String reasonNotAllowed) { + this.reasonNotAllowed = reasonNotAllowed; + } +} + diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java index d2d08590a3..ee891b10af 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectorDTO.java @@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.xml.bind.annotation.XmlType; import java.util.Collection; +import java.util.List; /** * Component representing a Connector instance. @@ -41,6 +42,7 @@ public class ConnectorDTO extends ComponentDTO { private String configurationUrl; private String detailsUrl; + private List<ConnectorActionDTO> availableActions; @Schema(description = "The name of the Connector.") public String getName() { @@ -158,4 +160,13 @@ public class ConnectorDTO extends ComponentDTO { public void setExtensionMissing(final Boolean extensionMissing) { this.extensionMissing = extensionMissing; } + + @Schema(description = "The available actions that can be performed on this Connector.") + public List<ConnectorActionDTO> getAvailableActions() { + return availableActions; + } + + public void setAvailableActions(final List<ConnectorActionDTO> availableActions) { + this.availableActions = availableActions; + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java new file mode 100644 index 0000000000..60a4865269 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorAction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +/** + * Represents an action that can be performed on a Connector. + */ +public interface ConnectorAction { + + /** + * Returns the name of this action. + * @return the action name + */ + String getName(); + + /** + * Returns a description of what this action does. + * @return the action description + */ + String getDescription(); + + /** + * Returns whether this action is currently allowed to be performed. + * @return true if the action is allowed, false otherwise + */ + boolean isAllowed(); + + /** + * Returns the reason why this action is not allowed, or null if the action is allowed. + * @return the reason the action is not allowed, or null if allowed + */ + String getReasonNotAllowed(); +} + diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java index 146be70d73..bb3bdf6e2d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java @@ -244,4 +244,11 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent */ void inheritConfiguration(List<VersionedConfigurationStep> activeFlowConfiguration, List<VersionedConfigurationStep> workingFlowConfiguration, Bundle flowContextBundle) throws FlowUpdateException; + + /** + * Returns the list of available actions that can be performed on this Connector. + * Each action includes whether it is currently allowed and, if not, the reason why. + * @return the list of available actions + */ + List<ConnectorAction> getAvailableActions(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java new file mode 100644 index 0000000000..3989bbe02c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Objects; + +public class StandardConnectorAction implements ConnectorAction { + + private final String name; + private final String description; + private final boolean allowed; + private final String reasonNotAllowed; + + public StandardConnectorAction(final String name, final String description, final boolean allowed, final String reasonNotAllowed) { + this.name = Objects.requireNonNull(name, "name is required"); + this.description = Objects.requireNonNull(description, "description is required"); + this.allowed = allowed; + this.reasonNotAllowed = reasonNotAllowed; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public boolean isAllowed() { + return allowed; + } + + @Override + public String getReasonNotAllowed() { + return reasonNotAllowed; + } + + @Override + public boolean equals(final Object other) { + if (other == null || getClass() != other.getClass()) { + return false; + } + final StandardConnectorAction that = (StandardConnectorAction) other; + return allowed == that.allowed + && Objects.equals(name, that.name) + && Objects.equals(description, that.description) + && Objects.equals(reasonNotAllowed, that.reasonNotAllowed); + } + + @Override + public int hashCode() { + return Objects.hash(name, description, allowed, reasonNotAllowed); + } + + @Override + public String toString() { + if (reasonNotAllowed == null) { + return "StandardConnectorAction[name=" + name + ", allowed=" + allowed + "]"; + } + return "StandardConnectorAction[name=" + name + ", allowed=" + allowed + ", reason=" + reasonNotAllowed + "]"; + } +} + diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index 5edd2d3fb1..2bb5793432 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -35,6 +35,7 @@ import org.apache.nifi.components.validation.ValidationState; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.FlowFileActivity; import org.apache.nifi.connectable.FlowFileTransferCounts; +import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; @@ -866,6 +867,175 @@ public class StandardConnectorNode implements ConnectorNode { recreateWorkingFlowContext(); } + @Override + public List<ConnectorAction> getAvailableActions() { + final List<ConnectorAction> actions = new ArrayList<>(); + final ConnectorState currentState = getCurrentState(); + final boolean dataQueued = activeFlowContext.getManagedProcessGroup().isDataQueued(); + final boolean stopped = isStopped(); + + actions.add(createStartAction(stopped)); + actions.add(createStopAction(currentState)); + actions.add(createConfigureAction()); + actions.add(createDiscardWorkingConfigAction()); + actions.add(createPurgeFlowFilesAction(stopped, dataQueued)); + actions.add(createDrainFlowFilesAction(stopped, dataQueued)); + actions.add(createApplyUpdatesAction(currentState)); + actions.add(createDeleteAction(stopped, dataQueued)); + + return actions; + } + + private boolean isStopped() { + final ConnectorState currentState = getCurrentState(); + if (currentState == ConnectorState.STOPPED) { + return true; + } + if (currentState == ConnectorState.UPDATED || currentState == ConnectorState.UPDATE_FAILED) { + return !hasActiveThread(getActiveFlowContext().getManagedProcessGroup()); + } + return false; + } + + private ConnectorAction createStartAction(final boolean stopped) { + final boolean allowed; + final String reason; + + if (!stopped) { + allowed = false; + reason = "Connector is not stopped"; + } else { + allowed = true; + reason = null; + } + + return new StandardConnectorAction("START", "Start the connector", allowed, reason); + } + + private ConnectorAction createStopAction(final ConnectorState currentState) { + final boolean allowed; + if (currentState == ConnectorState.RUNNING || currentState == ConnectorState.STARTING) { + allowed = true; + } else if (currentState == ConnectorState.UPDATED || currentState == ConnectorState.UPDATE_FAILED) { + allowed = hasActiveThread(activeFlowContext.getManagedProcessGroup()); + } else { + allowed = false; + } + + final String reason = allowed ? null : "Connector is not running"; + return new StandardConnectorAction("STOP", "Stop the connector", allowed, reason); + } + + private ConnectorAction createConfigureAction() { + return new StandardConnectorAction("CONFIGURE", "Configure the connector", true, null); + } + + private ConnectorAction createDiscardWorkingConfigAction() { + final boolean allowed = hasWorkingConfigurationChanges(); + final String reason = allowed ? null : "No pending changes to discard"; + + return new StandardConnectorAction("DISCARD_WORKING_CONFIGURATION", "Discard any changes made to the working configuration", allowed, reason); + } + + private boolean hasActiveThread(final ProcessGroup group) { + for (final ProcessorNode processor : group.getProcessors()) { + if (processor.getActiveThreadCount() > 0) { + return true; + } + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + if (hasActiveThread(childGroup)) { + return true; + } + } + + return false; + } + + private ConnectorAction createPurgeFlowFilesAction(final boolean stopped, final boolean dataQueued) { + final boolean allowed; + final String reason; + + if (!stopped) { + allowed = false; + reason = "Connector must be stopped"; + } else if (!dataQueued) { + allowed = false; + reason = "No data is queued"; + } else { + allowed = true; + reason = null; + } + + return new StandardConnectorAction("PURGE_FLOWFILES", "Purge all FlowFiles from the connector, dropping all data without processing it", allowed, reason); + } + + private ConnectorAction createDrainFlowFilesAction(final boolean stopped, final boolean dataQueued) { + final boolean allowed; + final String reason; + + if (!stopped) { + allowed = false; + reason = "Connector must be stopped"; + } else if (!dataQueued) { + allowed = false; + reason = "No data is queued"; + } else { + allowed = true; + reason = null; + } + + return new StandardConnectorAction("DRAIN_FLOWFILES", "Process data that is currently in the flow but do not ingest any additional data", allowed, reason); + } + + private ConnectorAction createApplyUpdatesAction(final ConnectorState currentState) { + final boolean allowed; + final String reason; + + if (currentState == ConnectorState.PREPARING_FOR_UPDATE || currentState == ConnectorState.UPDATING) { + allowed = false; + reason = "Connector is updating"; + } else if (!hasWorkingConfigurationChanges()) { + allowed = false; + reason = "No pending changes"; + } else { + allowed = true; + reason = null; + } + + return new StandardConnectorAction("APPLY_UPDATES", "Apply the working configuration to the active configuration", allowed, reason); + } + + private ConnectorAction createDeleteAction(final boolean stopped, final boolean dataQueued) { + final boolean allowed; + final String reason; + + if (!stopped) { + allowed = false; + reason = "Connector must be stopped"; + } else if (dataQueued) { + allowed = false; + reason = "Data is queued"; + } else { + allowed = true; + reason = null; + } + + return new StandardConnectorAction("DELETE", "Delete the connector", allowed, reason); + } + + private boolean hasWorkingConfigurationChanges() { + final FrameworkFlowContext workingContext = this.workingFlowContext; + if (workingContext == null) { + return false; + } + + final ConnectorConfiguration activeConfig = activeFlowContext.getConfigurationContext().toConnectorConfiguration(); + final ConnectorConfiguration workingConfig = workingContext.getConfigurationContext().toConnectorConfiguration(); + return !Objects.equals(activeConfig, workingConfig); + } + @Override public Authorizable getParentAuthorizable() { return parentAuthorizable; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java index 29d5baa8b3..95e7f8995d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java @@ -291,4 +291,20 @@ public class StandaloneProcessGroupLifecycle implements ProcessGroupLifecycle { return CompletableFuture.allOf(stopFutures.toArray(new CompletableFuture[0])); } + + @Override + public int getActiveThreadCount() { + return getActiveThreadCount(processGroup); + } + + private int getActiveThreadCount(final ProcessGroup group) { + int total = 0; + for (final ProcessorNode processor : group.getProcessors()) { + total += processor.getActiveThreadCount(); + } + for (final ProcessGroup childGroup : group.getProcessGroups()) { + total += getActiveThreadCount(childGroup); + } + return total; + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java index f8e63e2723..336d5a20fb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java @@ -159,6 +159,7 @@ public class StandardConnectorNodeIT { when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class)); when(flowController.getConnectorRepository()).thenReturn(connectorRepository); when(flowController.getValidationTrigger()).thenReturn(mock(ValidationTrigger.class)); + when(flowController.getConnectorValidationTrigger()).thenReturn(mock(ConnectorValidationTrigger.class)); doAnswer(invocation -> { return createConnection(invocation.getArgument(0), invocation.getArgument(1), invocation.getArgument(2), invocation.getArgument(3), invocation.getArgument(4)); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 9e60291f13..652cf93ffd 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -68,6 +68,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConfigurationStep; +import org.apache.nifi.components.connector.ConnectorAction; import org.apache.nifi.components.connector.ConnectorAssetRepository; import org.apache.nifi.components.connector.ConnectorConfiguration; import org.apache.nifi.components.connector.ConnectorNode; @@ -5266,10 +5267,26 @@ public final class DtoFactory { dto.setManagedProcessGroupId(activeFlowContext.getManagedProcessGroup().getIdentifier()); dto.setActiveConfiguration(createConnectorConfigurationDtoFromFlowContext(connector, activeFlowContext)); dto.setWorkingConfiguration(createConnectorConfigurationDtoFromFlowContext(connector, connector.getWorkingFlowContext())); + dto.setAvailableActions(createConnectorActionDtos(connector)); return dto; } + private List<ConnectorActionDTO> createConnectorActionDtos(final ConnectorNode connector) { + return connector.getAvailableActions().stream() + .map(this::createConnectorActionDto) + .collect(Collectors.toList()); + } + + private ConnectorActionDTO createConnectorActionDto(final ConnectorAction action) { + final ConnectorActionDTO dto = new ConnectorActionDTO(); + dto.setName(action.getName()); + dto.setDescription(action.getDescription()); + dto.setAllowed(action.isAllowed()); + dto.setReasonNotAllowed(action.getReasonNotAllowed()); + return dto; + } + private ConnectorConfigurationDTO createConnectorConfigurationDtoFromFlowContext(final ConnectorNode connector, final FrameworkFlowContext flowContext) { final List<ConfigurationStep> configurationSteps = connector.getConfigurationSteps(); if (configurationSteps == null || configurationSteps.isEmpty()) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java index d29a056425..94f822d724 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java @@ -84,5 +84,11 @@ public class AuthorizingProcessGroupLifecycle implements ProcessGroupLifecycle { authContext.authorizeWrite(); return delegate.stopProcessors(); } + + @Override + public int getActiveThreadCount() { + authContext.authorizeRead(); + return delegate.getActiveThreadCount(); + } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java index de6701159d..f3503b7ed1 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java @@ -21,14 +21,17 @@ import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.connector.AbstractConnector; import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ConnectableComponentType; +import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,19 +44,16 @@ public class DataQueuingConnector extends AbstractConnector { @Override public VersionedExternalFlow getInitialFlow() { - final VersionedProcessor generate = new VersionedProcessor(); - generate.setName("GenerateFlowFile"); - generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile"); - generate.setIdentifier("gen-1"); - generate.setGroupIdentifier("1234"); - generate.setProperties(Map.of("File Size", "1 KB")); - - final VersionedProcessor terminate = new VersionedProcessor(); - terminate.setName("TerminateFlowFile"); - terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile"); - terminate.setIdentifier("term-1"); - terminate.setGroupIdentifier("1234"); - terminate.setScheduledState(ScheduledState.DISABLED); + final Bundle bundle = new Bundle(); + bundle.setGroup("org.apache.nifi"); + bundle.setArtifact("nifi-system-test-extensions-nar"); + bundle.setVersion("2.7.0-SNAPSHOT"); + + final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING); + + final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); final ConnectableComponent source = new ConnectableComponent(); source.setId(generate.getIdentifier()); @@ -66,10 +66,18 @@ public class DataQueuingConnector extends AbstractConnector { destination.setGroupId("1234"); final VersionedConnection connection = new VersionedConnection(); + connection.setIdentifier("generate-to-terminate-1"); connection.setSource(source); connection.setDestination(destination); connection.setGroupIdentifier("1234"); - connection.setIdentifier("generate-to-terminate-1"); + connection.setSelectedRelationships(Set.of("success")); + connection.setBackPressureDataSizeThreshold("1 GB"); + connection.setBackPressureObjectThreshold(10_000L); + connection.setBends(Collections.emptyList()); + connection.setLabelIndex(1); + connection.setFlowFileExpiration("0 sec"); + connection.setPrioritizers(Collections.emptyList()); + connection.setzIndex(1L); final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); rootGroup.setName("Data Queuing Connector"); @@ -79,6 +87,7 @@ public class DataQueuingConnector extends AbstractConnector { final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); + flow.setParameterContexts(Collections.emptyMap()); return flow; } @@ -95,4 +104,36 @@ public class DataQueuingConnector extends AbstractConnector { @Override public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { } + + private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, + final String type, final Bundle bundle, final Map<String, String> properties, + final ScheduledState scheduledState) { + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(identifier); + processor.setGroupIdentifier(groupIdentifier); + processor.setName(name); + processor.setType(type); + processor.setBundle(bundle); + processor.setProperties(properties); + processor.setPropertyDescriptors(Collections.emptyMap()); + processor.setScheduledState(scheduledState); + + processor.setBulletinLevel("WARN"); + processor.setSchedulingStrategy("TIMER_DRIVEN"); + processor.setSchedulingPeriod("0 sec"); + processor.setExecutionNode("ALL"); + processor.setConcurrentlySchedulableTaskCount(1); + processor.setPenaltyDuration("30 sec"); + processor.setYieldDuration("1 sec"); + processor.setRunDurationMillis(0L); + processor.setPosition(new Position(0, 0)); + + processor.setAutoTerminatedRelationships(Collections.emptySet()); + processor.setRetryCount(10); + processor.setRetriedRelationships(Collections.emptySet()); + processor.setBackoffMechanism("PENALIZE_FLOWFILE"); + processor.setMaxBackoffPeriod("10 mins"); + + return processor; + } }
