This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d0352  NIFI-6985 Use correct versioned parameter contexts when child 
process groups are version controlled
08d0352 is described below

commit 08d0352ac11398f3400e3d23a99ff7d6546261ce
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Jan 7 13:25:27 2020 -0500

    NIFI-6985 Use correct versioned parameter contexts when child process 
groups are version controlled
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3962.
---
 .../apache/nifi/groups/StandardProcessGroup.java   | 41 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index fc52ad7..fbcf664 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -128,6 +128,7 @@ import org.apache.nifi.util.FlowDifferenceFilters;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.util.SnippetUtils;
+import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.VersionedFlowDTO;
@@ -3781,14 +3782,21 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             final ProcessGroup childGroup = 
childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
             final VersionedFlowCoordinates childCoordinates = 
proposedChildGroup.getVersionedFlowCoordinates();
 
+            // if there is a nested process group that is versioned 
controlled, make sure get the param contexts that go with that snapshot
+            // instead of the ones from the parent which would have been 
passed in to this method
+            Map<String, VersionedParameterContext> childParameterContexts = 
versionedParameterContexts;
+            if (childCoordinates != null && updateDescendantVersionedGroups) {
+                childParameterContexts = 
getVersionedParameterContexts(childCoordinates);
+            }
+
             if (childGroup == null) {
-                final ProcessGroup added = addProcessGroup(group, 
proposedChildGroup, componentIdSeed, variablesToSkip, 
versionedParameterContexts);
+                final ProcessGroup added = addProcessGroup(group, 
proposedChildGroup, componentIdSeed, variablesToSkip, childParameterContexts);
                 flowManager.onProcessGroupAdded(added);
                 
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
                 LOG.info("Added {} to {}", added, this);
             } else if (childCoordinates == null || 
updateDescendantVersionedGroups) {
                 updateProcessGroup(childGroup, proposedChildGroup, 
componentIdSeed, updatedVersionedComponentIds, true, true, 
updateDescendantVersionedGroups,
-                    variablesToSkip, versionedParameterContexts);
+                    variablesToSkip, childParameterContexts);
                 LOG.info("Updated {}", childGroup);
             }
 
@@ -4057,6 +4065,35 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    private Map<String,VersionedParameterContext> 
getVersionedParameterContexts(final VersionedFlowCoordinates 
versionedFlowCoordinates) {
+        final FlowRegistryClient flowRegistryClient = 
flowController.getFlowRegistryClient();
+
+        final String registryId = 
flowRegistryClient.getFlowRegistryId(versionedFlowCoordinates.getRegistryUrl());
+        if (registryId == null) {
+            throw new ResourceNotFoundException("Could not find any Flow 
Registry registered with url: " + versionedFlowCoordinates.getRegistryUrl());
+        }
+
+        final FlowRegistry flowRegistry = 
flowRegistryClient.getFlowRegistry(registryId);
+        if (flowRegistry == null) {
+            throw new ResourceNotFoundException("Could not find any Flow 
Registry registered with identifier " + registryId);
+        }
+
+        final String bucketId = versionedFlowCoordinates.getBucketId();
+        final String flowId = versionedFlowCoordinates.getFlowId();
+        final int flowVersion = versionedFlowCoordinates.getVersion();
+
+        try {
+            final VersionedFlowSnapshot childSnapshot = 
flowRegistry.getFlowContents(bucketId, flowId, flowVersion, false);
+            return  childSnapshot.getParameterContexts();
+        } catch (final NiFiRegistryException e) {
+            throw new IllegalArgumentException("The Flow Registry with ID " + 
registryId + " reports that no Flow exists with Bucket "
+                    + bucketId + ", Flow " + flowId + ", Version " + 
flowVersion);
+        } catch (final IOException ioe) {
+            throw new IllegalStateException(
+                    "Failed to communicate with Flow Registry when attempting 
to retrieve a versioned flow");
+        }
+    }
+
     private ParameterContext createParameterContext(final 
VersionedParameterContext versionedParameterContext, final String 
parameterContextId) {
         final Map<String, Parameter> parameters = new HashMap<>();
         for (final VersionedParameter versionedParameter : 
versionedParameterContext.getParameters()) {

Reply via email to