This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 08d0352 NIFI-6985 Use correct versioned parameter contexts when child
process groups are version controlled
08d0352 is described below
commit 08d0352ac11398f3400e3d23a99ff7d6546261ce
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Jan 7 13:25:27 2020 -0500
NIFI-6985 Use correct versioned parameter contexts when child process
groups are version controlled
Signed-off-by: Pierre Villard <[email protected]>
This closes #3962.
---
.../apache/nifi/groups/StandardProcessGroup.java | 41 ++++++++++++++++++++--
1 file changed, 39 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index fc52ad7..fbcf664 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -128,6 +128,7 @@ import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
+import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
@@ -3781,14 +3782,21 @@ public final class StandardProcessGroup implements
ProcessGroup {
final ProcessGroup childGroup =
childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
final VersionedFlowCoordinates childCoordinates =
proposedChildGroup.getVersionedFlowCoordinates();
+ // if there is a nested process group that is versioned
controlled, make sure get the param contexts that go with that snapshot
+ // instead of the ones from the parent which would have been
passed in to this method
+ Map<String, VersionedParameterContext> childParameterContexts =
versionedParameterContexts;
+ if (childCoordinates != null && updateDescendantVersionedGroups) {
+ childParameterContexts =
getVersionedParameterContexts(childCoordinates);
+ }
+
if (childGroup == null) {
- final ProcessGroup added = addProcessGroup(group,
proposedChildGroup, componentIdSeed, variablesToSkip,
versionedParameterContexts);
+ final ProcessGroup added = addProcessGroup(group,
proposedChildGroup, componentIdSeed, variablesToSkip, childParameterContexts);
flowManager.onProcessGroupAdded(added);
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, this);
} else if (childCoordinates == null ||
updateDescendantVersionedGroups) {
updateProcessGroup(childGroup, proposedChildGroup,
componentIdSeed, updatedVersionedComponentIds, true, true,
updateDescendantVersionedGroups,
- variablesToSkip, versionedParameterContexts);
+ variablesToSkip, childParameterContexts);
LOG.info("Updated {}", childGroup);
}
@@ -4057,6 +4065,35 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
}
+ private Map<String,VersionedParameterContext>
getVersionedParameterContexts(final VersionedFlowCoordinates
versionedFlowCoordinates) {
+ final FlowRegistryClient flowRegistryClient =
flowController.getFlowRegistryClient();
+
+ final String registryId =
flowRegistryClient.getFlowRegistryId(versionedFlowCoordinates.getRegistryUrl());
+ if (registryId == null) {
+ throw new ResourceNotFoundException("Could not find any Flow
Registry registered with url: " + versionedFlowCoordinates.getRegistryUrl());
+ }
+
+ final FlowRegistry flowRegistry =
flowRegistryClient.getFlowRegistry(registryId);
+ if (flowRegistry == null) {
+ throw new ResourceNotFoundException("Could not find any Flow
Registry registered with identifier " + registryId);
+ }
+
+ final String bucketId = versionedFlowCoordinates.getBucketId();
+ final String flowId = versionedFlowCoordinates.getFlowId();
+ final int flowVersion = versionedFlowCoordinates.getVersion();
+
+ try {
+ final VersionedFlowSnapshot childSnapshot =
flowRegistry.getFlowContents(bucketId, flowId, flowVersion, false);
+ return childSnapshot.getParameterContexts();
+ } catch (final NiFiRegistryException e) {
+ throw new IllegalArgumentException("The Flow Registry with ID " +
registryId + " reports that no Flow exists with Bucket "
+ + bucketId + ", Flow " + flowId + ", Version " +
flowVersion);
+ } catch (final IOException ioe) {
+ throw new IllegalStateException(
+ "Failed to communicate with Flow Registry when attempting
to retrieve a versioned flow");
+ }
+ }
+
private ParameterContext createParameterContext(final
VersionedParameterContext versionedParameterContext, final String
parameterContextId) {
final Map<String, Parameter> parameters = new HashMap<>();
for (final VersionedParameter versionedParameter :
versionedParameterContext.getParameters()) {