Repository: nifi Updated Branches: refs/heads/master 5eb5e96b1 -> 270ce8570
NIFI-5695: Fixed bug that caused ports to not properly map to their correct child group on Flow Import if the child group is independently versioned This closes #3070. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/270ce857 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/270ce857 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/270ce857 Branch: refs/heads/master Commit: 270ce8570df7a00a26f431d8d8ae0245b898bf69 Parents: 5eb5e96 Author: Mark Payne <[email protected]> Authored: Fri Oct 12 15:27:10 2018 -0400 Committer: Bryan Bende <[email protected]> Committed: Fri Oct 12 16:26:59 2018 -0400 ---------------------------------------------------------------------- .../nifi/groups/StandardProcessGroup.java | 36 ++++++++++++++ .../flow/mapping/NiFiRegistryFlowMapper.java | 51 +++++++++++--------- 2 files changed, 63 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 43578e4..a683a9e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3933,6 +3933,24 @@ public final class StandardProcessGroup implements ProcessGroup { return port.get(); } + // Attempt to locate child group by versioned component id + final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream() + .filter(child -> child.getVersionedComponentId().isPresent()) + .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId())) + .findFirst(); + + if (optionalSpecifiedGroup.isPresent()) { + final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get(); + return specifiedGroup.getInputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + } + + // If no child group matched the versioned component id, then look at all child groups. This is done because + // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result, + // if the flow doesn't contain the properly mapped group id, we need to search all child groups. return group.getProcessGroups().stream() .flatMap(gr -> gr.getInputPorts().stream()) .filter(component -> component.getVersionedComponentId().isPresent()) @@ -3950,6 +3968,24 @@ public final class StandardProcessGroup implements ProcessGroup { return port.get(); } + // Attempt to locate child group by versioned component id + final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream() + .filter(child -> child.getVersionedComponentId().isPresent()) + .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId())) + .findFirst(); + + if (optionalSpecifiedGroup.isPresent()) { + final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get(); + return specifiedGroup.getOutputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + } + + // If no child group matched the versioned component id, then look at all child groups. This is done because + // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result, + // if the flow doesn't contain the properly mapped group id, we need to search all child groups. return group.getProcessGroups().stream() .flatMap(gr -> gr.getOutputPorts().stream()) .filter(component -> component.getVersionedComponentId().isPresent()) http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 521b078..074302a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -71,6 +71,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; @@ -228,6 +229,20 @@ public class NiFiRegistryFlowMapper { return versionedId; } + private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E { + if (currentVersionedId.isPresent()) { + return currentVersionedId.get(); + } else { + final String resolved = versionedComponentIds.get(componentId); + if (resolved == null) { + throw exceptionSupplier.get(); + } + + return resolved; + } + } + + private String getGroupId(final String groupId) { return versionedComponentIds.get(groupId); } @@ -265,39 +280,27 @@ public class NiFiRegistryFlowMapper { public ConnectableComponent mapConnectable(final Connectable connectable) { final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier()); - final Optional<String> versionedId = connectable.getVersionedComponentId(); - if (versionedId.isPresent()) { - component.setId(versionedId.get()); - } else { - final String resolved = versionedComponentIds.get(connectable.getIdentifier()); - if (resolved == null) { - throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component"); - } - - component.setId(resolved); - } + final String versionedId = getIdOrThrow(connectable.getVersionedComponentId(), connectable.getIdentifier(), + () -> new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component")); + component.setId(versionedId); component.setComments(connectable.getComments()); + + final String groupId; if (connectable instanceof RemoteGroupPort) { final RemoteGroupPort port = (RemoteGroupPort) connectable; final RemoteProcessGroup rpg = port.getRemoteProcessGroup(); final Optional<String> rpgVersionedId = rpg.getVersionedComponentId(); - final String groupId; - if (rpgVersionedId.isPresent()) { - groupId = rpgVersionedId.get(); - } else { - final String resolved = versionedComponentIds.get(rpg.getIdentifier()); - if (resolved == null) { - throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to"); - } - - groupId = resolved; - } + groupId = getIdOrThrow(rpgVersionedId, rpg.getIdentifier(), + () -> new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to")); - component.setGroupId(groupId); } else { - component.setGroupId(connectable.getProcessGroupIdentifier()); + groupId = getIdOrThrow(connectable.getProcessGroup().getVersionedComponentId(), connectable.getProcessGroupIdentifier(), + () -> new IllegalArgumentException("Unable to find the Versioned Component ID for the Process Group that " + connectable + " belongs to")); } + + component.setGroupId(groupId); + component.setName(connectable.getName()); component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name())); return component;
