NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects
Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/014c542f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/014c542f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/014c542f Branch: refs/heads/master Commit: 014c542f48b61c027152129ae7cd8c8535dd6a64 Parents: 49aad2c Author: Mark Payne <[email protected]> Authored: Fri Dec 1 16:31:22 2017 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:55 2018 -0500 ---------------------------------------------------------------------- .../apache/nifi/groups/RemoteProcessGroup.java | 2 + .../apache/nifi/controller/FlowController.java | 1 + .../controller/StandardFlowSynchronizer.java | 2 + .../nifi/groups/StandardProcessGroup.java | 12 ++- .../registry/flow/RestBasedFlowRegistry.java | 42 +++++++--- .../nifi/remote/StandardRemoteProcessGroup.java | 88 ++++++++++++-------- .../org/apache/nifi/web/NiFiServiceFacade.java | 11 +++ .../nifi/web/StandardNiFiServiceFacade.java | 25 ++++++ .../nifi/web/api/ProcessGroupResource.java | 12 +-- .../apache/nifi/web/api/VersionsResource.java | 15 +++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 2 +- .../dao/impl/StandardRemoteProcessGroupDAO.java | 1 + 12 files changed, 156 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 7d92246..39be045 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent { + void initialize(); + @Override String getIdentifier(); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2afa9dc..158aaa2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1779,6 +1779,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { instantiateSnippet(group, dto, true); + group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); } private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException { http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 28d9b79..9cbf323 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -360,6 +360,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); } + rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize); + // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them. final Document existingFlowConfiguration = parseFlowBytes(existingFlow); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 4b186a9..fb3d3a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2964,6 +2964,13 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getStatus()) { @Override + public String getRegistryName() { + final String registryId = versionControlInformation.getRegistryIdentifier(); + final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); + return registry == null ? registryId : registry.getName(); + } + + @Override public boolean isModified() { boolean updated = false; while (true) { @@ -3220,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup { updated = flowStatus.compareAndSet(status, updatedStatus); } } catch (final IOException | NiFiRegistryException e) { - final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry"); + final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage()); setSyncFailedState(message); LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); @@ -3451,6 +3458,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); + added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); LOG.info("Added {} to {}", added, this); } else if (childCoordinates == null || updateDescendantVersionedGroups) { updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip); @@ -3759,7 +3767,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connectable destination = getConnectable(destinationGroup, proposed.getDestination()); if (destination == null) { - throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier() + throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId() + " but no component could be found in the Process Group with a corresponding identifier"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 1147b9e..21e5e0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -115,40 +115,61 @@ public class RestBasedFlowRegistry implements FlowRegistry { return (user == null || user.isAnonymous()) ? null : user.getIdentity(); } + private BucketClient getBucketClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final BucketClient bucketClient = identity == null ? registryClient.getBucketClient() : registryClient.getBucketClient(identity); + return bucketClient; + } + + private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final FlowSnapshotClient snapshotClient = identity == null ? registryClient.getFlowSnapshotClient() : registryClient.getFlowSnapshotClient(identity); + return snapshotClient; + } + + private FlowClient getFlowClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final FlowClient flowClient = identity == null ? registryClient.getFlowClient() : registryClient.getFlowClient(identity); + return flowClient; + } + @Override public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); + final BucketClient bucketClient = getBucketClient(user); return new HashSet<>(bucketClient.getAll()); } @Override public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); + final BucketClient bucketClient = getBucketClient(user); return bucketClient.get(bucketId); } @Override public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return new HashSet<>(flowClient.getByBucket(bucketId)); } @Override public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId)); } @Override public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.create(flow); } @Override public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.delete(bucketId, flowId); } @@ -156,7 +177,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); versionedFlowSnapshot.setFlowContents(snapshot); @@ -174,13 +195,14 @@ public class RestBasedFlowRegistry implements FlowRegistry { @Override public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount(); + return (int) getFlowClient(user).get(bucketId, flowId).getVersionCount(); } @Override public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); if (fetchRemoteFlows) { @@ -241,7 +263,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { @Override public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.get(bucketId, flowId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index ef05a1b..5808500 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -16,6 +16,40 @@ */ package org.apache.nifi.remote; +import static java.util.Objects.requireNonNull; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.Response; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; @@ -34,7 +68,6 @@ import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -51,39 +84,6 @@ import org.apache.nifi.web.api.dto.PortDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import javax.ws.rs.core.Response; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - /** * Represents the Root Process Group of a remote NiFi Instance. Holds * information about that remote instance, as well as {@link IncomingPort}s and @@ -104,6 +104,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final EventReporter eventReporter; private final NiFiProperties nifiProperties; private final long remoteContentsCacheExpiration; + private volatile boolean initialized = false; private final AtomicReference<String> name = new AtomicReference<>(); private final AtomicReference<Position> position = new AtomicReference<>(new Position(0D, 0D)); @@ -179,7 +180,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final Runnable checkAuthorizations = new InitializationTask(); backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true); - backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); + backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS); + } + + @Override + public void initialize() { + if (initialized) { + return; + } + + initialized = true; backgroundThreadExecutor.submit(() -> { try { refreshFlowContents(); @@ -820,6 +830,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void refreshFlowContents() throws CommunicationsException { + if (!initialized) { + return; + } + try { // perform the request final ControllerDTO dto; @@ -1153,6 +1167,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void run() { + if (!initialized) { + return; + } + try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { try { final ControllerDTO dto = apiClient.getController(targetUris); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 02df16b..be77d10 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -418,6 +418,17 @@ public interface NiFiServiceFacade { void verifyComponentTypes(VersionedProcessGroup versionedGroup); /** + * Verifies that the flow identified by the given Version Control Information can be imported into the Process Group + * with the given id + * + * @param versionControlInfo the information about the versioned flow + * @param groupId the ID of the Process Group where the flow should be instantiated + * + * @throws IllegalStateException if the flow cannot be imported into the specified group + */ + void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId); + + /** * Creates a new Template based off the specified snippet. * * @param name name http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4945296..4adb85b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -1863,6 +1863,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + verifyImportProcessGroup(versionControlInfo, group); + } + + private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) { + if (group == null) { + return; + } + + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier()) + && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) + && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { + + throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " + + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); + } + } + + verifyImportProcessGroup(vciDto, group.getParent()); + } + + @Override public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) { // get the specified snippet final Snippet snippet = snippetDAO.getSnippet(snippetId); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 7b753d6..a3bb5b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1641,6 +1641,10 @@ public class ProcessGroupResource extends ApplicationResource { // Step 6: Replicate the request or call serviceFacade.updateProcessGroup final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); + if (versionControlInfo != null) { + serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId); + } + if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry @@ -1685,12 +1689,8 @@ public class ProcessGroupResource extends ApplicationResource { } } }, - () -> { - final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); - if (versionedFlowSnapshot != null) { - serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents()); - } - }, + () -> { + }, processGroupEntity -> { final ProcessGroupDTO processGroup = processGroupEntity.getComponent(); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 245713e..6dd641b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -424,15 +424,24 @@ public class VersionsResource extends ApplicationResource { if (versionedFlowDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionedFlowDto.getBucketId() == null) { + if (StringUtils.isEmpty(versionedFlowDto.getBucketId())) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionedFlowDto.getFlowName() == null && versionedFlowDto.getFlowId() == null) { + if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) { throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied."); } - if (versionedFlowDto.getRegistryId() == null) { + if (versionedFlowDto.getFlowName().length() > 1000) { + throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters"); + } + if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) { throw new IllegalArgumentException("The Registry ID must be supplied."); } + if (versionedFlowDto.getDescription() != null && versionedFlowDto.getDescription().length() > 65535) { + throw new IllegalArgumentException("Flow Description cannot exceed 65,535 characters"); + } + if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) { + throw new IllegalArgumentException("Comments cannot exceed 65,535 characters"); + } // ensure we're not attempting to version the root group final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 8e5974a..7a1442d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1792,7 +1792,7 @@ public final class DtoFactory { componentDto.setId(processorDto.getId()); componentDto.setName(processorDto.getName()); componentDto.setProcessGroupId(processorDto.getParentGroupId()); - componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); componentDto.setState(processorDto.getState()); componentDto.setValidationErrors(processorDto.getValidationErrors()); component.setComponent(componentDto); http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index c570dfc..941aae0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -85,6 +85,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // create the remote process group RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris); + remoteProcessGroup.initialize(); // set other properties updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
