This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b012e9aad2 NIFI-10371: When a component is moved between groups,
ensure that its versioned component id is unique within the destination group.
Also ensure that when adding a connection to a PG with the
VersionedComponentSynchronizer that we prefer obtaining source/destination by
instance id instead of versioned id.
b012e9aad2 is described below
commit b012e9aad298ad54012b5cda319aef90cbdfa8ab
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 17 17:57:35 2022 -0400
NIFI-10371: When a component is moved between groups, ensure that its
versioned component id is unique within the destination group. Also ensure that
when adding a connection to a PG with the VersionedComponentSynchronizer that
we prefer obtaining source/destination by instance id instead of versioned id.
Fixed bug where ProcessGroup would inadvertently set the wrong component's
Versioned Component ID to null when there was an ID conflict
This closes #6314
Signed-off-by: David Handermann <[email protected]>
---
.../StandardVersionedComponentSynchronizer.java | 19 +++++++-
.../apache/nifi/groups/StandardProcessGroup.java | 55 ++++++++++++++++++++++
.../org/apache/nifi/controller/AbstractPort.java | 10 ++++
.../nifi/integration/FrameworkIntegrationTest.java | 4 +-
.../processgroup/StandardProcessGroupIT.java | 45 ++++++++++++++++++
.../resources/int-tests/clustered-nifi.properties | 2 +-
.../resources/int-tests/default-nifi.properties | 2 +-
7 files changed, 133 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index fa1a07931f..e9c41c21bc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -3072,7 +3072,24 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
}
private Connectable getConnectable(final ProcessGroup group, final
ConnectableComponent connectableComponent) {
- final String id = connectableComponent.getId();
+ // Always prefer the instance identifier, if it's available.
+ final Connectable connectable = getConnectable(group,
connectableComponent, ConnectableComponent::getInstanceIdentifier);
+ if (connectable != null) {
+ LOG.debug("Found Connectable {} in Process Group {} by Instance ID
{}", connectable, group, connectableComponent.getInstanceIdentifier());
+ return connectable;
+ }
+
+ // If we're synchronizing and the component is not available by the
instance ID, lookup the component by the ID instead.
+ final Connectable connectableById = getConnectable(group,
connectableComponent, ConnectableComponent::getId);
+ LOG.debug("Found no connectable in Process Group {} by Instance ID.
Lookup by ID {} yielded {}", connectable, connectableComponent.getId(),
connectableById);
+ return connectableById;
+ }
+
+ private Connectable getConnectable(final ProcessGroup group, final
ConnectableComponent connectableComponent, final Function<ConnectableComponent,
String> idFunction) {
+ final String id = idFunction.apply(connectableComponent);
+ if (id == null) {
+ return null;
+ }
switch (connectableComponent.getType()) {
case FUNNEL:
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3e8ee1ea38..06a83e331a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -612,6 +612,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
try {
// Unique port check within the same group.
verifyPortUniqueness(port, inputPorts, this::getInputPortByName);
+ ensureUniqueVersionControlId(port, getInputPorts());
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
@@ -695,6 +696,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
try {
// Unique port check within the same group.
verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);
+ ensureUniqueVersionControlId(port, getOutputPorts());
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
@@ -770,6 +772,8 @@ public final class StandardProcessGroup implements
ProcessGroup {
writeLock.lock();
try {
+ ensureUniqueVersionControlId(group, getProcessGroups());
+
group.setParent(this);
group.getVariableRegistry().setParent(getVariableRegistry());
@@ -877,6 +881,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new IllegalStateException("RemoteProcessGroup already
exists with ID " + remoteGroup.getIdentifier());
}
+ ensureUniqueVersionControlId(remoteGroup,
getRemoteProcessGroups());
remoteGroup.setProcessGroup(this);
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(),
remoteGroup);
onComponentModified();
@@ -958,6 +963,8 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new IllegalStateException("A processor is already
registered to this ProcessGroup with ID " + processorId);
}
+ ensureUniqueVersionControlId(processor, getProcessors());
+
processor.setProcessGroup(this);
processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
@@ -971,6 +978,50 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
}
+ /**
+ * A component's Versioned Component ID is used to link a component on the
canvas to a component in a versioned flow.
+ * There may, however, be multiple instances of the same versioned flow in
a single NiFi instance. In this case, we will have
+ * multiple components with the same Versioned Component ID. This is
acceptable as long as no two components within the same Process Group
+ * have the same Versioned Component ID. However, it is not acceptable to
have two components within the same Process Group that have the same
+ * Versioned Component ID. If this happens, we will have no way to know
which component in our flow maps to which component in the versioned flow.
+ * We don't have an issue with this when a flow is imported, etc. because
it is always imported to a new Process Group. However, because it's possible
+ * to move most components between groups, we can have a situation in
which a component is moved to a higher group, and that can result in a conflict.
+ * In such a case, we handle this by nulling out the Versioned Component
ID if there is a conflict. This essentially makes NiFi behave as if a component
+ * is copied & pasted instead of being moved whenever a conflict occurs.
+ *
+ * @param component the component whose Versioned Component ID should be
nulled if there's a conflict
+ * @param componentsToCheck the components to check to determine if
there's a conflict
+ */
+ private void ensureUniqueVersionControlId(final
org.apache.nifi.components.VersionedComponent component,
+ final Collection<? extends
org.apache.nifi.components.VersionedComponent> componentsToCheck) {
+ final Optional<String> optionalVersionControlId =
component.getVersionedComponentId();
+ if (!optionalVersionControlId.isPresent()) {
+ return;
+ }
+
+ final String versionControlId = optionalVersionControlId.get();
+ final boolean duplicateId =
containsVersionedComponentId(componentsToCheck, versionControlId);
+
+ if (duplicateId) {
+ LOG.debug("Adding {} to {}, found conflicting Version Component ID
{} so marking Version Component ID of {} as null", component, this,
versionControlId, component);
+ component.setVersionedComponentId(null);
+ } else {
+ LOG.debug("Adding {} to {}, found no conflicting Version Component
ID for ID {}", component, this, versionControlId);
+ }
+ }
+
+ private boolean containsVersionedComponentId(final Collection<? extends
org.apache.nifi.components.VersionedComponent> components, final String id) {
+ for (final org.apache.nifi.components.VersionedComponent component :
components) {
+ final Optional<String> optionalConnectableId =
component.getVersionedComponentId();
+ if (optionalConnectableId.isPresent() &&
Objects.equals(optionalConnectableId.get(), id)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
/**
* Looks for any property that is configured on the given component that
references a Controller Service.
* If any exists, and that Controller Service is not accessible from this
Process Group, then the given
@@ -1185,6 +1236,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
}
+ ensureUniqueVersionControlId(connection, getConnections());
connection.setProcessGroup(this);
source.addConnection(connection);
if (source != destination) { // don't call addConnection twice if
it's a self-looping connection.
@@ -1401,6 +1453,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new IllegalStateException("A label already exists in
this ProcessGroup with ID " + label.getIdentifier());
}
+ ensureUniqueVersionControlId(label, getLabels());
label.setProcessGroup(this);
labels.put(label.getIdentifier(), label);
onComponentModified();
@@ -2151,6 +2204,8 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new IllegalStateException("A funnel already exists in
this ProcessGroup with ID " + funnel.getIdentifier());
}
+ ensureUniqueVersionControlId(funnel, getFunnels());
+
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
flowManager.onFunnelAdded(funnel);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index d42bc729f1..43968c9d66 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -167,6 +168,15 @@ public abstract class AbstractPort implements Port {
@Override
public void setProcessGroup(final ProcessGroup newGroup) {
+ if (this.processGroup.get() != null && !Objects.equals(newGroup,
this.processGroup.get())) {
+ // Process Group is changing. For a Port, we effectively want to
consider this the same as
+ // deleting an old port and creating a new one, in terms of
tracking the port to a versioned flow.
+ // This ensures that we have a unique versioned component id not
only in the given process group but also
+ // between the given group and its parent and all children. This
is important for ports because we can
+ // connect to/from them between Process Groups, so we need to
ensure unique IDs.
+ versionedComponentId.set(null);
+ }
+
this.processGroup.set(newGroup);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 965b3ac3e9..553c6356a7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -409,7 +409,9 @@ public class FrameworkIntegrationTest {
final String uuid = getSimpleTypeName(processorType) + "-" +
UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate =
SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ProcessorNode procNode =
flowController.getFlowManager().createProcessor(processorType, uuid,
bundleCoordinate, Collections.emptySet(), true, true, null);
- destination.addProcessor(procNode);
+ if (destination != null) {
+ destination.addProcessor(procNode);
+ }
return procNode;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index a2bc54fc44..53cda12c98 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -21,17 +21,20 @@ import
org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.FrameworkIntegrationTest;
+import org.apache.nifi.integration.processor.BiConsumerProcessor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.Revision;
import org.junit.Assert;
import org.junit.Test;
@@ -42,8 +45,50 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
+
+ @Test
+ public void testConflictingVersionedComponentId() {
+ final ProcessorNode proc1 =
createProcessorNode(BiConsumerProcessor.class, null);
+ getRootGroup().addProcessor(proc1);
+
+ final ProcessorNode proc2 =
createProcessorNode(BiConsumerProcessor.class, null);
+ proc2.setVersionedComponentId("aaa");
+ getRootGroup().addProcessor(proc2);
+ // Ensure that id didn't change
+ assertEquals("aaa", proc2.getVersionedComponentId().get());
+
+ final ProcessorNode proc3 =
createProcessorNode(BiConsumerProcessor.class, null);
+ proc3.setVersionedComponentId("bbb");
+ getRootGroup().addProcessor(proc3);
+ assertEquals("bbb", proc3.getVersionedComponentId().get());
+
+ final ProcessorNode proc4 =
createProcessorNode(BiConsumerProcessor.class, null);
+ proc4.setVersionedComponentId("bbb");
+ getRootGroup().addProcessor(proc4);
+ // Ensure that versioned component id was nulled out
+ assertFalse(proc4.getVersionedComponentId().isPresent());
+
+ final ProcessGroup childGroup =
getFlowController().getFlowManager().createProcessGroup("child");
+ childGroup.setName("child");
+ getRootGroup().addProcessGroup(childGroup);
+
+ final ProcessorNode proc5 =
createProcessorNode(BiConsumerProcessor.class, null);
+ proc5.setVersionedComponentId("bbb");
+ childGroup.addProcessor(proc5);
+ assertEquals("bbb", proc5.getVersionedComponentId().get());
+
+ // Move processor from child group to parent group.
+ // This should null out the ID for proc5 and leave proc3 as is.
+ final StandardSnippet snippet = new StandardSnippet();
+ snippet.addProcessors(Collections.singletonMap(proc5.getIdentifier(),
new Revision(0L, "abc", proc5.getIdentifier())));
+ childGroup.move(snippet, getRootGroup());
+ assertFalse(proc5.getVersionedComponentId().isPresent());
+ assertEquals("bbb", proc3.getVersionedComponentId().get());
+ }
+
@Test
public void testProcessGroupDefaults() {
// Connect two processors with default settings of the root process
group
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
index cfbb442c95..95ddce1fc0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
# Provenance Repository Properties
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
# Persistent Provenance Repository Properties
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
index 2959474e53..6735a6f4f3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
# Provenance Repository Properties
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
# Persistent Provenance Repository Properties
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository