This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b012e9aad2 NIFI-10371: When a component is moved between groups, 
ensure that its versioned component id is unique within the destination group. 
Also ensure that when adding a connection to a PG with the 
VersionedComponentSynchronizer that we prefer obtaining source/destination by 
instance id instead of versioned id.
b012e9aad2 is described below

commit b012e9aad298ad54012b5cda319aef90cbdfa8ab
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 17 17:57:35 2022 -0400

    NIFI-10371: When a component is moved between groups, ensure that its 
versioned component id is unique within the destination group. Also ensure that 
when adding a connection to a PG with the VersionedComponentSynchronizer that 
we prefer obtaining source/destination by instance id instead of versioned id.
    
    Fixed bug where ProcessGroup would inadvertently set the wrong component's 
Versioned Component ID to null when there was an ID conflict
    
    This closes #6314
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../StandardVersionedComponentSynchronizer.java    | 19 +++++++-
 .../apache/nifi/groups/StandardProcessGroup.java   | 55 ++++++++++++++++++++++
 .../org/apache/nifi/controller/AbstractPort.java   | 10 ++++
 .../nifi/integration/FrameworkIntegrationTest.java |  4 +-
 .../processgroup/StandardProcessGroupIT.java       | 45 ++++++++++++++++++
 .../resources/int-tests/clustered-nifi.properties  |  2 +-
 .../resources/int-tests/default-nifi.properties    |  2 +-
 7 files changed, 133 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index fa1a07931f..e9c41c21bc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -3072,7 +3072,24 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
     }
 
     private Connectable getConnectable(final ProcessGroup group, final 
ConnectableComponent connectableComponent) {
-        final String id = connectableComponent.getId();
+        // Always prefer the instance identifier, if it's available.
+        final Connectable connectable = getConnectable(group, 
connectableComponent, ConnectableComponent::getInstanceIdentifier);
+        if (connectable != null) {
+            LOG.debug("Found Connectable {} in Process Group {} by Instance ID 
{}", connectable, group, connectableComponent.getInstanceIdentifier());
+            return connectable;
+        }
+
+        // If we're synchronizing and the component is not available by the 
instance ID, lookup the component by the ID instead.
+        final Connectable connectableById = getConnectable(group, 
connectableComponent, ConnectableComponent::getId);
+        LOG.debug("Found no connectable in Process Group {} by Instance ID. 
Lookup by ID {} yielded {}", connectable, connectableComponent.getId(), 
connectableById);
+        return connectableById;
+    }
+
+    private Connectable getConnectable(final ProcessGroup group, final 
ConnectableComponent connectableComponent, final Function<ConnectableComponent, 
String> idFunction) {
+        final String id = idFunction.apply(connectableComponent);
+        if (id == null) {
+            return null;
+        }
 
         switch (connectableComponent.getType()) {
             case FUNNEL:
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3e8ee1ea38..06a83e331a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -612,6 +612,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, inputPorts, this::getInputPortByName);
+            ensureUniqueVersionControlId(port, getInputPorts());
 
             port.setProcessGroup(this);
             inputPorts.put(requireNonNull(port).getIdentifier(), port);
@@ -695,6 +696,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);
+            ensureUniqueVersionControlId(port, getOutputPorts());
 
             port.setProcessGroup(this);
             outputPorts.put(port.getIdentifier(), port);
@@ -770,6 +772,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
         writeLock.lock();
         try {
+            ensureUniqueVersionControlId(group, getProcessGroups());
+
             group.setParent(this);
             group.getVariableRegistry().setParent(getVariableRegistry());
 
@@ -877,6 +881,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("RemoteProcessGroup already 
exists with ID " + remoteGroup.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(remoteGroup, 
getRemoteProcessGroups());
             remoteGroup.setProcessGroup(this);
             
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), 
remoteGroup);
             onComponentModified();
@@ -958,6 +963,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("A processor is already 
registered to this ProcessGroup with ID " + processorId);
             }
 
+            ensureUniqueVersionControlId(processor, getProcessors());
+
             processor.setProcessGroup(this);
             processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
@@ -971,6 +978,50 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    /**
+     * A component's Versioned Component ID is used to link a component on the 
canvas to a component in a versioned flow.
+     * There may, however, be multiple instances of the same versioned flow in 
a single NiFi instance. In this case, we will have
+     * multiple components with the same Versioned Component ID. This is 
acceptable as long as no two components within the same Process Group
+     * have the same Versioned Component ID. However, it is not acceptable to 
have two components within the same Process Group that have the same
+     * Versioned Component ID. If this happens, we will have no way to know 
which component in our flow maps to which component in the versioned flow.
+     * We don't have an issue with this when a flow is imported, etc. because 
it is always imported to a new Process Group. However, because it's possible
+     * to move most components between groups, we can have a situation in 
which a component is moved to a higher group, and that can result in a conflict.
+     * In such a case, we handle this by nulling out the Versioned Component 
ID if there is a conflict. This essentially makes NiFi behave as if a component
+     * is copied & pasted instead of being moved whenever a conflict occurs.
+     *
+     * @param component the component whose Versioned Component ID should be 
nulled if there's a conflict
+     * @param componentsToCheck the components to check to determine if 
there's a conflict
+     */
+    private void ensureUniqueVersionControlId(final 
org.apache.nifi.components.VersionedComponent component,
+                                              final Collection<? extends 
org.apache.nifi.components.VersionedComponent> componentsToCheck) {
+        final Optional<String> optionalVersionControlId = 
component.getVersionedComponentId();
+        if (!optionalVersionControlId.isPresent()) {
+            return;
+        }
+
+        final String versionControlId = optionalVersionControlId.get();
+        final boolean duplicateId = 
containsVersionedComponentId(componentsToCheck, versionControlId);
+
+        if (duplicateId) {
+            LOG.debug("Adding {} to {}, found conflicting Version Component ID 
{} so marking Version Component ID of {} as null", component, this, 
versionControlId, component);
+            component.setVersionedComponentId(null);
+        } else {
+            LOG.debug("Adding {} to {}, found no conflicting Version Component 
ID for ID {}", component, this, versionControlId);
+        }
+    }
+
+    private boolean containsVersionedComponentId(final Collection<? extends 
org.apache.nifi.components.VersionedComponent> components, final String id) {
+        for (final org.apache.nifi.components.VersionedComponent component : 
components) {
+            final Optional<String> optionalConnectableId = 
component.getVersionedComponentId();
+            if (optionalConnectableId.isPresent() && 
Objects.equals(optionalConnectableId.get(), id)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+
     /**
      * Looks for any property that is configured on the given component that 
references a Controller Service.
      * If any exists, and that Controller Service is not accessible from this 
Process Group, then the given
@@ -1185,6 +1236,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
             }
 
+            ensureUniqueVersionControlId(connection, getConnections());
             connection.setProcessGroup(this);
             source.addConnection(connection);
             if (source != destination) {  // don't call addConnection twice if 
it's a self-looping connection.
@@ -1401,6 +1453,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("A label already exists in 
this ProcessGroup with ID " + label.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(label, getLabels());
             label.setProcessGroup(this);
             labels.put(label.getIdentifier(), label);
             onComponentModified();
@@ -2151,6 +2204,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("A funnel already exists in 
this ProcessGroup with ID " + funnel.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(funnel, getFunnels());
+
             funnel.setProcessGroup(this);
             funnels.put(funnel.getIdentifier(), funnel);
             flowManager.onFunnelAdded(funnel);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index d42bc729f1..43968c9d66 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -42,6 +42,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -167,6 +168,15 @@ public abstract class AbstractPort implements Port {
 
     @Override
     public void setProcessGroup(final ProcessGroup newGroup) {
+        if (this.processGroup.get() != null && !Objects.equals(newGroup, 
this.processGroup.get())) {
+            // Process Group is changing. For a Port, we effectively want to 
consider this the same as
+            // deleting an old port and creating a new one, in terms of 
tracking the port to a versioned flow.
+            // This ensures that we have a unique versioned component id not 
only in the given process group but also
+            // between the given group and its parent and all children. This 
is important for ports because we can
+            // connect to/from them between Process Groups, so we need to 
ensure unique IDs.
+            versionedComponentId.set(null);
+        }
+
         this.processGroup.set(newGroup);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 965b3ac3e9..553c6356a7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -409,7 +409,9 @@ public class FrameworkIntegrationTest {
         final String uuid = getSimpleTypeName(processorType) + "-" + 
UUID.randomUUID().toString();
         final BundleCoordinate bundleCoordinate = 
SystemBundle.SYSTEM_BUNDLE_COORDINATE;
         final ProcessorNode procNode = 
flowController.getFlowManager().createProcessor(processorType, uuid, 
bundleCoordinate, Collections.emptySet(), true, true, null);
-        destination.addProcessor(procNode);
+        if (destination != null) {
+            destination.addProcessor(procNode);
+        }
 
         return procNode;
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index a2bc54fc44..53cda12c98 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -21,17 +21,20 @@ import 
org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
+import org.apache.nifi.integration.processor.BiConsumerProcessor;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.Revision;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -42,8 +45,50 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 public class StandardProcessGroupIT extends FrameworkIntegrationTest {
+
+    @Test
+    public void testConflictingVersionedComponentId() {
+        final ProcessorNode proc1 = 
createProcessorNode(BiConsumerProcessor.class, null);
+        getRootGroup().addProcessor(proc1);
+
+        final ProcessorNode proc2 = 
createProcessorNode(BiConsumerProcessor.class, null);
+        proc2.setVersionedComponentId("aaa");
+        getRootGroup().addProcessor(proc2);
+        // Ensure that id didn't change
+        assertEquals("aaa", proc2.getVersionedComponentId().get());
+
+        final ProcessorNode proc3 = 
createProcessorNode(BiConsumerProcessor.class, null);
+        proc3.setVersionedComponentId("bbb");
+        getRootGroup().addProcessor(proc3);
+        assertEquals("bbb", proc3.getVersionedComponentId().get());
+
+        final ProcessorNode proc4 = 
createProcessorNode(BiConsumerProcessor.class, null);
+        proc4.setVersionedComponentId("bbb");
+        getRootGroup().addProcessor(proc4);
+        // Ensure that versioned component id was nulled out
+        assertFalse(proc4.getVersionedComponentId().isPresent());
+
+        final ProcessGroup childGroup = 
getFlowController().getFlowManager().createProcessGroup("child");
+        childGroup.setName("child");
+        getRootGroup().addProcessGroup(childGroup);
+
+        final ProcessorNode proc5 = 
createProcessorNode(BiConsumerProcessor.class, null);
+        proc5.setVersionedComponentId("bbb");
+        childGroup.addProcessor(proc5);
+        assertEquals("bbb", proc5.getVersionedComponentId().get());
+
+        // Move processor from child group to parent group.
+        // This should null out the ID for proc5 and leave proc3 as is.
+        final StandardSnippet snippet = new StandardSnippet();
+        snippet.addProcessors(Collections.singletonMap(proc5.getIdentifier(), 
new Revision(0L, "abc", proc5.getIdentifier())));
+        childGroup.move(snippet, getRootGroup());
+        assertFalse(proc5.getVersionedComponentId().isPresent());
+        assertEquals("bbb", proc3.getVersionedComponentId().get());
+    }
+
     @Test
     public void testProcessGroupDefaults() {
         // Connect two processors with default settings of the root process 
group
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
index cfbb442c95..95ddce1fc0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
 
 # Provenance Repository Properties
 
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
 
 # Persistent Provenance Repository Properties
 
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
index 2959474e53..6735a6f4f3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
 
 # Provenance Repository Properties
 
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
 
 # Persistent Provenance Repository Properties
 
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository

Reply via email to