This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5b8af8d4090cb8a25dc97b6f2064c174bf1188dd Author: Mark Payne <[email protected]> AuthorDate: Fri Dec 19 15:19:57 2025 -0500 NIFI-15370: Add Connector ID to process groups and do not register create process groups if they have a connector ID. (#10673) --- .../StandardVersionedComponentSynchronizer.java | 6 ++++-- .../java/org/apache/nifi/groups/StandardProcessGroup.java | 9 ++++++++- .../org/apache/nifi/groups/StandardProcessGroupTest.java | 3 ++- .../java/org/apache/nifi/controller/flow/FlowManager.java | 6 +++++- .../src/main/java/org/apache/nifi/groups/ProcessGroup.java | 5 +++++ .../apache/nifi/controller/flow/StandardFlowManager.java | 13 +++++-------- .../nifi/controller/service/mock/MockProcessGroup.java | 5 +++++ .../apache/nifi/stateless/engine/StatelessFlowManager.java | 5 +++-- 8 files changed, 37 insertions(+), 15 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 3466d317e4..3050a7db70 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -1377,7 +1377,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Map<String, VersionedParameterContext> versionedParameterContexts, final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); - final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + final String connectorId = destination.getConnectorIdentifier().orElse(null); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(destination); group.setName(proposed.getName()); @@ -1949,7 +1950,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final ProcessGroup groupToUpdate; if (processGroup == null) { final String groupId = synchronizationOptions.getComponentIdGenerator().generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), parentGroup.getIdentifier()); - final ProcessGroup group = context.getFlowManager().createProcessGroup(groupId); + final String connectorId = parentGroup.getConnectorIdentifier().orElse(null); + final ProcessGroup group = context.getFlowManager().createProcessGroup(groupId, connectorId); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(parentGroup); group.setName(proposed.getName()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 658ee0cc9d..9918eda3a1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -173,6 +173,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>(); private static final SecureRandom randomGenerator = new SecureRandom(); + private final String connectorId; private final ProcessScheduler scheduler; private final ControllerServiceProvider controllerServiceProvider; @@ -231,7 +232,7 @@ public final class StandardProcessGroup implements ProcessGroup { final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final ReloadComponent reloadComponent, final NodeTypeProvider nodeTypeProvider, final NiFiProperties nifiProperties, final StatelessGroupNodeFactory statelessGroupNodeFactory, - final AssetManager assetManager) { + final AssetManager assetManager, final String connectorId) { this.id = id; this.controllerServiceProvider = serviceProvider; @@ -245,6 +246,7 @@ public final class StandardProcessGroup implements ProcessGroup { this.reloadComponent = reloadComponent; this.nodeTypeProvider = nodeTypeProvider; this.assetManager = assetManager; + this.connectorId = connectorId; name = new AtomicReference<>(); position = new AtomicReference<>(new Position(0D, 0D)); @@ -338,6 +340,11 @@ public final class StandardProcessGroup implements ProcessGroup { setLoggingAttributes(); } + @Override + public Optional<String> getConnectorIdentifier() { + return Optional.ofNullable(connectorId); + } + @Override public void setPosition(final Position position) { this.position.set(position); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java index b3f888a339..0887de6c90 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java @@ -130,7 +130,8 @@ class StandardProcessGroupTest { nodeTypeProvider, properties, statelessGroupNodeFactory, - assetManager + assetManager, + null ); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java index 266e9888d2..d756e8531e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java @@ -252,7 +252,11 @@ public interface FlowManager extends ParameterProviderLookup { Port createLocalOutputPort(String id, String name); - ProcessGroup createProcessGroup(String id); + default ProcessGroup createProcessGroup(String id) { + return createProcessGroup(id, null); + } + + ProcessGroup createProcessGroup(String id, String connectorId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 821f0a533d..66d66f96da 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -138,6 +138,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi */ void setName(String name); + /** + * @return the ID of the Connector that is responsible for this Process Group, or an empty optional if no Connector is associated + */ + Optional<String> getConnectorIdentifier(); + /** * @return the user-set comments about this ProcessGroup, or * <code>null</code> if no comments have been set diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index 7cf4bc8707..65e8011b90 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -298,19 +298,16 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana } @Override - public ProcessGroup createProcessGroup(final String id) { - return createProcessGroup(id, true); - } - - private ProcessGroup createProcessGroup(final String id, final boolean registerGroup) { + public ProcessGroup createProcessGroup(final String id, final String connectorId) { final StatelessGroupNodeFactory statelessGroupNodeFactory = new StandardStatelessGroupNodeFactory(flowController, sslContext, flowController.createKerberosConfig(nifiProperties)); final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, flowController.getEncryptor(), flowController.getExtensionManager(), flowController.getStateManagerProvider(), this, flowController.getReloadComponent(), flowController, nifiProperties, statelessGroupNodeFactory, - flowController.getAssetManager()); + flowController.getAssetManager(), connectorId); - if (registerGroup) { + // We don't want to register the group if it's being created as part of a Connector + if (connectorId == null) { onProcessGroupAdded(group); } @@ -767,7 +764,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana final StandardConnectorConfigurationContext activeConfigurationContext = new StandardConnectorConfigurationContext( flowController.getConnectorAssetManager(), flowController.getConnectorRepository().getSecretsManager()); - final ProcessGroupFactory processGroupFactory = groupId -> createProcessGroup(groupId, false); + final ProcessGroupFactory processGroupFactory = groupId -> createProcessGroup(groupId, id); final FlowContextFactory flowContextFactory = new FlowControllerFlowContextFactory(flowController, managedRootGroup, activeConfigurationContext, processGroupFactory); final ConnectorNode connectorNode = new ExtensionBuilder() diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index decb1b90a7..8dcba71418 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -127,6 +127,11 @@ public class MockProcessGroup implements ProcessGroup { } + @Override + public Optional<String> getConnectorIdentifier() { + return Optional.empty(); + } + @Override public void setPosition(final Position position) { diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java index 4aed3f8ce5..a637745bb6 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java @@ -226,7 +226,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan } @Override - public ProcessGroup createProcessGroup(final String id) { + public ProcessGroup createProcessGroup(final String id, final String connectorId) { final ProcessGroup created = new StandardProcessGroup(id, statelessEngine.getControllerServiceProvider(), statelessEngine.getProcessScheduler(), statelessEngine.getPropertyEncryptor(), @@ -237,7 +237,8 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan new StatelessNodeTypeProvider(), null, group -> null, - statelessEngine.getAssetManager()); + statelessEngine.getAssetManager(), + connectorId); onProcessGroupAdded(created); return created;
