This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 64b110c292 NIFI-11464 Improvements for importing nested versioned
flows (#7218)
64b110c292 is described below
commit 64b110c2929b9d9de4d354d0ecc05c8b9c0fb8dc
Author: Bryan Bende <[email protected]>
AuthorDate: Mon Jun 5 10:11:33 2023 -0400
NIFI-11464 Improvements for importing nested versioned flows (#7218)
* NIFI-11464 Improvements for importing nested versioned flows
- Introduce FlowSnapshotContainer to return root snapshot + children
- Introduce ControllerServiceResolver to extract logic from service facade
- Update resolution logic to correctly consider all services in the
hierarchy
- Merge additional parameter contexts and parameter providers from child to
parent
- Add unit test for controller service resolver
- Replace use of emptSet/emptyMap with new set/map instance
---
.../StandardControllerServiceApiLookup.java | 71 +++++
.../service/StandardControllerServiceResolver.java | 197 +++++++++++++
.../StandardVersionedComponentSynchronizer.java | 25 +-
.../apache/nifi/groups/StandardProcessGroup.java | 4 +-
.../flow/StandardFlowRegistryClientNode.java | 19 +-
.../java/org/apache/nifi/util/BundleUtils.java | 0
.../StandardControllerServiceResolverTest.java | 296 +++++++++++++++++++
.../convert-record-external-schema-registry.json | 321 +++++++++++++++++++++
.../child.json | 230 +++++++++++++++
.../parent.json | 260 +++++++++++++++++
.../service/ControllerServiceApiLookup.java | 37 +++
.../service/ControllerServiceResolver.java | 40 +++
.../nifi/registry/flow/FlowRegistryClientNode.java | 2 +-
.../nifi/registry/flow/FlowSnapshotContainer.java | 141 +++++++++
.../registry/flow/FlowSnapshotContainerTest.java | 153 ++++++++++
.../org/apache/nifi/controller/FlowController.java | 10 +
.../org/apache/nifi/web/NiFiServiceFacade.java | 9 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 139 +--------
.../apache/nifi/web/api/FlowUpdateResource.java | 19 +-
.../apache/nifi/web/api/ProcessGroupResource.java | 22 +-
.../org/apache/nifi/web/api/VersionsResource.java | 9 +-
.../nifi/web/controller/ControllerFacade.java | 5 +
.../apache/nifi/web/api/TestVersionsResource.java | 5 +-
23 files changed, 1835 insertions(+), 179 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceApiLookup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceApiLookup.java
new file mode 100644
index 0000000000..a31cc66731
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceApiLookup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.util.BundleUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class StandardControllerServiceApiLookup implements
ControllerServiceApiLookup {
+
+ private final ExtensionManager extensionManager;
+
+ public StandardControllerServiceApiLookup(final ExtensionManager
extensionManager) {
+ this.extensionManager = extensionManager;
+ }
+
+ @Override
+ public Map<String, ControllerServiceAPI> getRequiredServiceApis(final
String type, final Bundle bundle) {
+ final Optional<BundleCoordinate> compatibleBundle =
BundleUtils.getOptionalCompatibleBundle(extensionManager, type,
BundleUtils.createBundleDto(bundle));
+ if (!compatibleBundle.isPresent()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, ControllerServiceAPI> serviceApis = new HashMap<>();
+ final ConfigurableComponent tempComponent =
extensionManager.getTempComponent(type, compatibleBundle.get());
+
+ for (final PropertyDescriptor descriptor :
tempComponent.getPropertyDescriptors()) {
+ final Class<? extends ControllerService> requiredServiceApiClass =
descriptor.getControllerServiceDefinition();
+ if (requiredServiceApiClass == null) {
+ continue;
+ }
+
+ final ClassLoader serviceApiClassLoader =
requiredServiceApiClass.getClassLoader();
+ final org.apache.nifi.bundle.Bundle serviceApiBundle =
extensionManager.getBundle(serviceApiClassLoader);
+ final BundleDetails serviceApiBundleDetails =
serviceApiBundle.getBundleDetails();
+ final BundleCoordinate serviceApiBundleCoordinate =
serviceApiBundleDetails.getCoordinate();
+
+ final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
+ serviceApi.setType(requiredServiceApiClass.getCanonicalName());
+ serviceApi.setBundle(new
Bundle(serviceApiBundleCoordinate.getGroup(),
serviceApiBundleCoordinate.getId(), serviceApiBundleCoordinate.getVersion()));
+ serviceApis.put(descriptor.getName(), serviceApi);
+ }
+
+ return serviceApis;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceResolver.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceResolver.java
new file mode 100644
index 0000000000..3819b7e241
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceResolver.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.VersionedConfigurableExtension;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Collectors;
+
+public class StandardControllerServiceResolver implements
ControllerServiceResolver {
+
+ private final Authorizer authorizer;
+ private final FlowManager flowManager;
+ private final NiFiRegistryFlowMapper flowMapper;
+ private final ControllerServiceProvider controllerServiceProvider;
+ private final ControllerServiceApiLookup controllerServiceApiLookup;
+
+ public StandardControllerServiceResolver(final Authorizer authorizer,
+ final FlowManager flowManager,
+ final NiFiRegistryFlowMapper
flowMapper,
+ final ControllerServiceProvider
controllerServiceProvider,
+ final ControllerServiceApiLookup
controllerServiceApiLookup) {
+ this.authorizer = authorizer;
+ this.flowManager = flowManager;
+ this.flowMapper = flowMapper;
+ this.controllerServiceProvider = controllerServiceProvider;
+ this.controllerServiceApiLookup = controllerServiceApiLookup;
+ }
+
+ @Override
+ public void resolveInheritedControllerServices(final FlowSnapshotContainer
flowSnapshotContainer, final String parentGroupId, final NiFiUser user) {
+ final RegisteredFlowSnapshot topLevelSnapshot =
flowSnapshotContainer.getFlowSnapshot();
+ final VersionedProcessGroup versionedGroup =
topLevelSnapshot.getFlowContents();
+ final Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences =
topLevelSnapshot.getExternalControllerServices();
+
+ final ProcessGroup parentGroup = flowManager.getGroup(parentGroupId);
+
+ final Set<VersionedControllerService> ancestorServices =
parentGroup.getControllerServices(true).stream()
+ .filter(serviceNode -> serviceNode.isAuthorized(authorizer,
RequestAction.READ, user))
+ .map(serviceNode ->
flowMapper.mapControllerService(serviceNode, controllerServiceProvider, new
HashSet<>(), new HashMap<>()))
+ .collect(Collectors.toSet());
+
+ final Stack<Set<VersionedControllerService>> serviceHierarchyStack =
new Stack<>();
+ serviceHierarchyStack.push(ancestorServices);
+
+ resolveInheritedControllerServices(flowSnapshotContainer,
versionedGroup, externalControllerServiceReferences, serviceHierarchyStack);
+ }
+
+ private void resolveInheritedControllerServices(final
FlowSnapshotContainer flowSnapshotContainer, final VersionedProcessGroup
versionedGroup,
+ final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences,
+ final
Stack<Set<VersionedControllerService>> serviceHierarchyStack) {
+
+ final Set<VersionedControllerService> currentGroupServices =
versionedGroup.getControllerServices() == null ? Collections.emptySet() :
versionedGroup.getControllerServices();
+ serviceHierarchyStack.push(currentGroupServices);
+
+ final Set<VersionedControllerService> availableControllerServices =
serviceHierarchyStack.stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+
+ for (final VersionedProcessor processor :
versionedGroup.getProcessors()) {
+ resolveInheritedControllerServices(processor,
availableControllerServices, externalControllerServiceReferences);
+ }
+
+ for (final VersionedControllerService service :
versionedGroup.getControllerServices()) {
+ resolveInheritedControllerServices(service,
availableControllerServices, externalControllerServiceReferences);
+ }
+
+ // If the child group is under version, the external service
references need to come from the snapshot of the
+ // child instead of what was passed into this method which was for the
parent group
+ for (final VersionedProcessGroup child :
versionedGroup.getProcessGroups()) {
+ final Map<String, ExternalControllerServiceReference>
childExternalServices;
+ if (child.getVersionedFlowCoordinates() == null) {
+ childExternalServices = externalControllerServiceReferences;
+ } else {
+ final RegisteredFlowSnapshot childSnapshot =
flowSnapshotContainer.getChildSnapshot(child.getIdentifier());
+ if (childSnapshot == null) {
+ childExternalServices = Collections.emptyMap();
+ } else {
+ childExternalServices =
childSnapshot.getExternalControllerServices();
+ }
+ }
+ resolveInheritedControllerServices(flowSnapshotContainer, child,
childExternalServices, serviceHierarchyStack);
+ }
+
+ serviceHierarchyStack.pop();
+ }
+
+ private void resolveInheritedControllerServices(final
VersionedConfigurableExtension component, final Set<VersionedControllerService>
availableControllerServices,
+ final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences) {
+
+ final Map<String, ControllerServiceAPI> componentRequiredApis =
controllerServiceApiLookup.getRequiredServiceApis(component.getType(),
component.getBundle());
+ if (componentRequiredApis.isEmpty()) {
+ return;
+ }
+
+ final Map<String, VersionedPropertyDescriptor> propertyDescriptors =
component.getPropertyDescriptors();
+ final Map<String, String> componentProperties =
component.getProperties();
+
+ for (final Map.Entry<String, String> entry :
componentProperties.entrySet()) {
+ final String propertyName = entry.getKey();
+ final String propertyValue = entry.getValue();
+
+ final VersionedPropertyDescriptor propertyDescriptor =
propertyDescriptors.get(propertyName);
+ if (propertyDescriptor == null) {
+ continue;
+ }
+
+ if (!propertyDescriptor.getIdentifiesControllerService()) {
+ continue;
+ }
+
+ final Set<String> availableControllerServiceIds =
availableControllerServices.stream()
+ .map(VersionedControllerService::getIdentifier)
+ .collect(Collectors.toSet());
+
+ // If the referenced Controller Service is available, there is
nothing to resolve.
+ if (availableControllerServiceIds.contains(propertyValue)) {
+ continue;
+ }
+
+ final ExternalControllerServiceReference externalServiceReference
= externalControllerServiceReferences == null ? null :
externalControllerServiceReferences.get(propertyValue);
+ if (externalServiceReference == null) {
+ continue;
+ }
+
+ final ControllerServiceAPI descriptorRequiredApi =
componentRequiredApis.get(propertyName);
+ if (descriptorRequiredApi == null) {
+ continue;
+ }
+
+ final String externalControllerServiceName =
externalServiceReference.getName();
+ final List<VersionedControllerService> matchingControllerServices
= availableControllerServices.stream()
+ .filter(service ->
service.getName().equals(externalControllerServiceName))
+ .filter(service -> implementsApi(descriptorRequiredApi,
service))
+ .collect(Collectors.toList());
+
+ if (matchingControllerServices.size() != 1) {
+ continue;
+ }
+
+ final VersionedControllerService matchingService =
matchingControllerServices.get(0);
+ final String resolvedId = matchingService.getIdentifier();;
+ componentProperties.put(propertyName, resolvedId);
+ }
+ }
+
+ private boolean implementsApi(final ControllerServiceAPI
requiredServiceApi, final VersionedControllerService
versionedControllerService) {
+ if (versionedControllerService.getControllerServiceApis() == null) {
+ return false;
+ }
+
+ for (final ControllerServiceAPI implementedApi :
versionedControllerService.getControllerServiceApis()) {
+ if (implementedApi.getType().equals(requiredServiceApi.getType())
+ &&
implementedApi.getBundle().getGroup().equals(requiredServiceApi.getBundle().getGroup())
+ &&
implementedApi.getBundle().getArtifact().equals(requiredServiceApi.getBundle().getArtifact()))
{
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 62be6a1467..889a6e8996 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -94,6 +94,7 @@ import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.FlowRegistryClientContextFactory;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -513,33 +514,20 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
}
private void synchronizeChildGroups(final ProcessGroup group, final
VersionedProcessGroup proposed, final Map<String, VersionedParameterContext>
versionedParameterContexts,
- final Map<String, ProcessGroup>
childGroupsByVersionedId,
- final Map<String,
ParameterProviderReference> parameterProviderReferences, final ProcessGroup
topLevelGroup) throws ProcessorInstantiationException {
+ final Map<String, ProcessGroup>
childGroupsByVersionedId, final Map<String, ParameterProviderReference>
parameterProviderReferences,
+ final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
for (final VersionedProcessGroup proposedChildGroup :
proposed.getProcessGroups()) {
final ProcessGroup childGroup =
childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
final VersionedFlowCoordinates childCoordinates =
proposedChildGroup.getVersionedFlowCoordinates();
- // if there is a nested process group that is version controlled,
make sure get the param contexts that go with that snapshot
- // instead of the ones from the parent which would have been
passed in to this method
- Map<String, VersionedParameterContext> childParameterContexts =
versionedParameterContexts;
- if (childCoordinates != null &&
syncOptions.isUpdateDescendantVersionedFlows()) {
- final String childParameterContextName =
proposedChildGroup.getParameterContextName();
- if (childParameterContextName != null &&
!versionedParameterContexts.containsKey(childParameterContextName)) {
- childParameterContexts =
getVersionedParameterContexts(childCoordinates);
- } else {
- childParameterContexts = versionedParameterContexts;
- }
- }
-
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group,
proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables,
- childParameterContexts, parameterProviderReferences,
topLevelGroup);
+ versionedParameterContexts,
parameterProviderReferences, topLevelGroup);
context.getFlowManager().onProcessGroupAdded(added);
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, group);
} else if (childCoordinates == null ||
syncOptions.isUpdateDescendantVersionedFlows()) {
-
final StandardVersionedComponentSynchronizer sync = new
StandardVersionedComponentSynchronizer(context);
sync.setPreExistingVariables(preExistingVariables);
sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
@@ -548,7 +536,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
.build();
sync.setSynchronizationOptions(options);
- sync.synchronize(childGroup, proposedChildGroup,
childParameterContexts, parameterProviderReferences, topLevelGroup);
+ sync.synchronize(childGroup, proposedChildGroup,
versionedParameterContexts, parameterProviderReferences, topLevelGroup);
LOG.info("Updated {}", childGroup);
}
@@ -2229,7 +2217,8 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
final int flowVersion = versionedFlowCoordinates.getVersion();
try {
- final RegisteredFlowSnapshot childSnapshot =
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getAnonymousContext(),
bucketId, flowId, flowVersion, false);
+ final FlowSnapshotContainer snapshotContainer =
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getAnonymousContext(),
bucketId, flowId, flowVersion, false);
+ final RegisteredFlowSnapshot childSnapshot =
snapshotContainer.getFlowSnapshot();
return childSnapshot.getParameterContexts();
} catch (final FlowRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " +
registryId + " reports that no Flow exists with Bucket "
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index c0e48a0bb4..9cf0da4d41 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -87,6 +87,7 @@ import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.FlowRegistryClientContextFactory;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
@@ -3864,8 +3865,9 @@ public final class StandardProcessGroup implements
ProcessGroup {
throw new FlowRegistryException(flowRegistry + " cannot
currently be used to synchronize with Flow Registry because it is currently
validating");
}
- final RegisteredFlowSnapshot registrySnapshot =
flowRegistry.getFlowContents(
+ final FlowSnapshotContainer registrySnapshotContainer =
flowRegistry.getFlowContents(
FlowRegistryClientContextFactory.getAnonymousContext(),
vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false);
+ final RegisteredFlowSnapshot registrySnapshot =
registrySnapshotContainer.getFlowSnapshot();
final VersionedProcessGroup registryFlow =
registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | FlowRegistryException e) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
index 98a0925880..9c27972b3a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
@@ -216,19 +216,20 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
}
@Override
- public RegisteredFlowSnapshot getFlowContents(
+ public FlowSnapshotContainer getFlowContents(
final FlowRegistryClientUserContext context, final String
bucketId, final String flowId, final int version, final boolean fetchRemoteFlows
) throws FlowRegistryException, IOException {
- final RegisteredFlowSnapshot flowSnapshot = execute(()
->client.get().getComponent().getFlowContents(getConfigurationContext(context),
bucketId, flowId, version));
+ final RegisteredFlowSnapshot flowSnapshot = execute(() ->
client.get().getComponent().getFlowContents(getConfigurationContext(context),
bucketId, flowId, version));
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(flowSnapshot);
if (fetchRemoteFlows) {
final VersionedProcessGroup contents =
flowSnapshot.getFlowContents();
for (final VersionedProcessGroup child :
contents.getProcessGroups()) {
- populateVersionedContentsRecursively(context, child);
+ populateVersionedContentsRecursively(context, child,
snapshotContainer);
}
}
- return flowSnapshot;
+ return snapshotContainer;
}
@Override
@@ -296,7 +297,8 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
return context.getNiFiUserIdentity().orElse(null);
}
- private void populateVersionedContentsRecursively(final
FlowRegistryClientUserContext context, final VersionedProcessGroup group)
throws IOException, FlowRegistryException {
+ private void populateVersionedContentsRecursively(final
FlowRegistryClientUserContext context, final VersionedProcessGroup group,
+ final
FlowSnapshotContainer snapshotContainer) throws FlowRegistryException {
if (group == null) {
return;
}
@@ -326,10 +328,12 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
coordinates.setLatest(snapshot.isLatest());
+
+ snapshotContainer.addChildSnapshot(snapshot, group);
}
for (final VersionedProcessGroup child : group.getProcessGroups()) {
- populateVersionedContentsRecursively(context, child);
+ populateVersionedContentsRecursively(context, child,
snapshotContainer);
}
}
@@ -345,7 +349,8 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
for (final FlowRegistryClientNode clientNode : clientNodes) {
try {
logger.debug("Attempting to fetch flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
- final RegisteredFlowSnapshot snapshot =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ final FlowSnapshotContainer snapshotContainer =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ final RegisteredFlowSnapshot snapshot =
snapshotContainer.getFlowSnapshot();
coordinates.setRegistryId(clientNode.getIdentifier());
logger.debug("Successfully fetched flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
similarity index 100%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
rename to
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceResolverTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceResolverTest.java
new file mode 100644
index 0000000000..89d54c5ef2
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceResolverTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StandardControllerServiceResolverTest {
+
+ private static final String BASE_SNAPSHOT_LOCATION =
"src/test/resources/snapshots";
+ private static final String CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION
= BASE_SNAPSHOT_LOCATION + "/versioned-child-services-from-parent";
+ private static final String STANDARD_EXTERNAL_SERVICE_REFERENCE =
BASE_SNAPSHOT_LOCATION + "/standard-external-service-reference";
+
+ private Authorizer authorizer;
+ private FlowManager flowManager;
+ private NiFiRegistryFlowMapper flowMapper;
+ private ControllerServiceProvider controllerServiceProvider;
+ private ControllerServiceApiLookup controllerServiceApiLookup;
+
+ private NiFiUser nifiUser;
+ private ProcessGroup parentGroup;
+
+ private ControllerServiceResolver serviceResolver;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @BeforeEach
+ public void setup() {
+ authorizer = mock(Authorizer.class);
+ flowManager = mock(FlowManager.class);
+ flowMapper = mock(NiFiRegistryFlowMapper.class);
+ controllerServiceProvider = mock(ControllerServiceProvider.class);
+ controllerServiceApiLookup = mock(ControllerServiceApiLookup.class);
+
+ nifiUser = mock(NiFiUser.class);
+
+ parentGroup = mock(ProcessGroup.class);
+ when(parentGroup.getIdentifier()).thenReturn("parentGroup");
+
when(parentGroup.getControllerServices(true)).thenReturn(Collections.emptySet());
+
when(flowManager.getGroup(parentGroup.getIdentifier())).thenReturn(parentGroup);
+
+ serviceResolver = new StandardControllerServiceResolver(authorizer,
flowManager, flowMapper,
+ controllerServiceProvider, controllerServiceApiLookup);
+ }
+
+ @Test
+ public void testVersionedControlledChildResolveServicesFromParent() throws
IOException {
+ // Load individual snapshots
+ final RegisteredFlowSnapshot parent =
loadSnapshot(CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION + "/parent.json");
+ final RegisteredFlowSnapshot child =
loadSnapshot(CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION + "/child.json");
+
+ // Find the child group inside parent where the child snapshot would
have been loaded, and simulate loading the contents
+ final VersionedProcessGroup matchingChildGroup =
findChildGroupByName(parent.getFlowContents(),
child.getFlowContents().getName());
+ assertNotNull(matchingChildGroup);
+
matchingChildGroup.setProcessors(child.getFlowContents().getProcessors());
+
matchingChildGroup.setControllerServices(child.getFlowContents().getControllerServices());
+
+ // Create container with snapshots
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(parent);
+ snapshotContainer.addChildSnapshot(child, matchingChildGroup);
+
+ // Before resolving, verify that child processor references different
service ids
+ final VersionedProcessor childConvertRecord =
findProcessorByName(matchingChildGroup, "ConvertRecord");
+ assertNotNull(childConvertRecord);
+
+ final String childConvertRecordReaderId =
childConvertRecord.getProperties().get("record-reader");
+ final String childConvertRecordWriterId =
childConvertRecord.getProperties().get("record-writer");
+ assertNotNull(childConvertRecordReaderId);
+ assertNotNull(childConvertRecordWriterId);
+
+ final VersionedControllerService parentReader =
findServiceByName(parent.getFlowContents(), "MyAvroReader");
+ final VersionedControllerService parentWriter =
findServiceByName(parent.getFlowContents(), "MyAvroRecordSetWriter");
+ assertNotNull(parentReader);
+ assertNotNull(parentWriter);
+
+ assertNotEquals(childConvertRecordReaderId,
parentReader.getIdentifier());
+ assertNotEquals(childConvertRecordWriterId,
parentWriter.getIdentifier());
+
+ // Setup the ControllerServiceAPI lookup for ConvertRecord
+ final Map<String, ControllerServiceAPI> convertRecordRequiredApis =
new HashMap<>();
+ convertRecordRequiredApis.put("record-reader",
createServiceApi("org.apache.nifi.serialization.RecordReaderFactory",
+ "org.apache.nifi", "nifi-standard-services-api-nar",
"1.21.0-SNAPSHOT"));
+ convertRecordRequiredApis.put("record-writer",
createServiceApi("org.apache.nifi.serialization.RecordSetWriterFactory",
+ "org.apache.nifi", "nifi-standard-services-api-nar",
"1.21.0-SNAPSHOT"));
+
+
when(controllerServiceApiLookup.getRequiredServiceApis(childConvertRecord.getType(),
childConvertRecord.getBundle()))
+ .thenReturn(convertRecordRequiredApis);
+
+ // Resolve inherited services
+ serviceResolver.resolveInheritedControllerServices(snapshotContainer,
parentGroup.getIdentifier(), nifiUser);
+
+ // Verify child processor now references ids from parent
+ final String resolvedConvertRecordReaderId =
childConvertRecord.getProperties().get("record-reader");
+ assertEquals(parentReader.getIdentifier(),
resolvedConvertRecordReaderId);
+
+ final String resolvedConvertRecordWriterId =
childConvertRecord.getProperties().get("record-writer");
+ assertEquals(parentWriter.getIdentifier(),
resolvedConvertRecordWriterId);
+ }
+
+ @Test
+ public void testExternalServiceResolvedFromOutsideSnapshot() throws
IOException {
+ final RegisteredFlowSnapshot snapshot =
loadSnapshot(STANDARD_EXTERNAL_SERVICE_REFERENCE +
"/convert-record-external-schema-registry.json");
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(snapshot);
+
+ // Get the AvroReader and the id of the SR it is using before
resolving external services
+ final VersionedControllerService avroReader =
findServiceByName(snapshot.getFlowContents(), "AvroReader");
+ assertNotNull(avroReader);
+
+ final String avroReaderSchemaRegistryId =
avroReader.getProperties().get("schema-registry");
+ assertNotNull(avroReaderSchemaRegistryId);
+
+ // Setup the ControllerServiceAPI lookup for AvroReader
+ final ControllerServiceAPI schemaRegistryServiceApi =
createServiceApi("org.apache.nifi.schemaregistry.services.SchemaRegistry",
+ "org.apache.nifi", "nifi-standard-services-api-nar",
"1.21.0-SNAPSHOT");
+
+ final Map<String, ControllerServiceAPI> avroReaderRequiredApis = new
HashMap<>();
+ avroReaderRequiredApis.put("schema-registry",
schemaRegistryServiceApi);
+
+
when(controllerServiceApiLookup.getRequiredServiceApis(avroReader.getType(),
avroReader.getBundle()))
+ .thenReturn(avroReaderRequiredApis);
+
+ // Setup an existing service from the parent group that implements the
same API with same name
+ final ControllerServiceNode schemaRegistryServiceNode =
mock(ControllerServiceNode.class);
+
when(parentGroup.getControllerServices(true)).thenReturn(Collections.singleton(schemaRegistryServiceNode));
+ when(schemaRegistryServiceNode.isAuthorized(any(Authorizer.class),
any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
+
+ final VersionedControllerService schemaRegistryVersionedService = new
VersionedControllerService();
+
schemaRegistryVersionedService.setIdentifier("external-schema-registry");
+ schemaRegistryVersionedService.setName("AvroSchemaRegistry");
+
schemaRegistryVersionedService.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
+
+ when(flowMapper.mapControllerService(schemaRegistryServiceNode,
controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
+ .thenReturn(schemaRegistryVersionedService);
+
+ // Resolve inherited services
+ serviceResolver.resolveInheritedControllerServices(snapshotContainer,
parentGroup.getIdentifier(), nifiUser);
+
+ // Verify the SR id in the AvroReader has been resolved to the
external service id
+ final String avroReaderSchemaRegistryIdAfterResolution =
avroReader.getProperties().get("schema-registry");
+ assertNotNull(avroReaderSchemaRegistryIdAfterResolution);
+ assertNotEquals(avroReaderSchemaRegistryId,
avroReaderSchemaRegistryIdAfterResolution);
+ assertEquals(schemaRegistryVersionedService.getIdentifier(),
avroReaderSchemaRegistryIdAfterResolution);
+ }
+
+ @Test
+ public void
testExternalServiceNotResolvedFromOutsideSnapshotBecauseMultipleWithSameNameAndType()
throws IOException {
+ final RegisteredFlowSnapshot snapshot =
loadSnapshot(STANDARD_EXTERNAL_SERVICE_REFERENCE +
"/convert-record-external-schema-registry.json");
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(snapshot);
+
+ // Get the AvroReader and the id of the SR it is using before
resolving external services
+ final VersionedControllerService avroReader =
findServiceByName(snapshot.getFlowContents(), "AvroReader");
+ assertNotNull(avroReader);
+
+ final String avroReaderSchemaRegistryId =
avroReader.getProperties().get("schema-registry");
+ assertNotNull(avroReaderSchemaRegistryId);
+
+ // Setup the ControllerServiceAPI lookup for AvroReader
+ final ControllerServiceAPI schemaRegistryServiceApi =
createServiceApi("org.apache.nifi.schemaregistry.services.SchemaRegistry",
+ "org.apache.nifi", "nifi-standard-services-api-nar",
"1.21.0-SNAPSHOT");
+
+ final Map<String, ControllerServiceAPI> avroReaderRequiredApis = new
HashMap<>();
+ avroReaderRequiredApis.put("schema-registry",
schemaRegistryServiceApi);
+
+
when(controllerServiceApiLookup.getRequiredServiceApis(avroReader.getType(),
avroReader.getBundle()))
+ .thenReturn(avroReaderRequiredApis);
+
+ // Setup two existing services from the parent group that implements
the same API with same name
+ final ControllerServiceNode schemaRegistryServiceNode1 =
mock(ControllerServiceNode.class);
+ final ControllerServiceNode schemaRegistryServiceNode2 =
mock(ControllerServiceNode.class);
+ when(parentGroup.getControllerServices(true)).thenReturn(new
HashSet<>(Arrays.asList(schemaRegistryServiceNode1,
schemaRegistryServiceNode2)));
+ when(schemaRegistryServiceNode1.isAuthorized(any(Authorizer.class),
any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
+ when(schemaRegistryServiceNode2.isAuthorized(any(Authorizer.class),
any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
+
+ final VersionedControllerService schemaRegistryVersionedService1 = new
VersionedControllerService();
+
schemaRegistryVersionedService1.setIdentifier("external-schema-registry-1");
+ schemaRegistryVersionedService1.setName("AvroSchemaRegistry");
+
schemaRegistryVersionedService1.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
+
+ when(flowMapper.mapControllerService(schemaRegistryServiceNode1,
controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
+ .thenReturn(schemaRegistryVersionedService1);
+
+ final VersionedControllerService schemaRegistryVersionedService2 = new
VersionedControllerService();
+
schemaRegistryVersionedService2.setIdentifier("external-schema-registry-2");
+ schemaRegistryVersionedService2.setName("AvroSchemaRegistry");
+
schemaRegistryVersionedService2.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
+
+ when(flowMapper.mapControllerService(schemaRegistryServiceNode2,
controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
+ .thenReturn(schemaRegistryVersionedService2);
+
+ // Resolve inherited services
+ serviceResolver.resolveInheritedControllerServices(snapshotContainer,
parentGroup.getIdentifier(), nifiUser);
+
+ // Verify the SR id in the AvroReader has not been resolved due to
there being to possible choices
+ final String avroReaderSchemaRegistryIdAfterResolution =
avroReader.getProperties().get("schema-registry");
+ assertNotNull(avroReaderSchemaRegistryIdAfterResolution);
+ assertEquals(avroReaderSchemaRegistryId,
avroReaderSchemaRegistryIdAfterResolution);
+ }
+
+ private RegisteredFlowSnapshot loadSnapshot(final String snapshotFile)
throws IOException {
+ return objectMapper.readValue(new File(snapshotFile),
RegisteredFlowSnapshot.class);
+ }
+
+ private VersionedProcessGroup findChildGroupByName(final
VersionedProcessGroup group, final String childGroupName) {
+ if (group.getProcessGroups() == null ||
group.getProcessGroups().isEmpty()) {
+ return null;
+ }
+
+ for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
+ if (childGroup.getName().equals(childGroupName)) {
+ return childGroup;
+ }
+ }
+
+ for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
+ final VersionedProcessGroup matchingChild =
findChildGroupByName(childGroup, childGroupName);
+ if (matchingChild != null) {
+ return matchingChild;
+ }
+ }
+
+ return null;
+ }
+
+ private VersionedControllerService findServiceByName(final
VersionedProcessGroup group, final String serviceName) {
+ if (group.getControllerServices() == null) {
+ return null;
+ }
+
+ return group.getControllerServices().stream()
+ .filter(service -> service.getName().equals(serviceName))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private VersionedProcessor findProcessorByName(final VersionedProcessGroup
group, final String processorName) {
+ if (group.getProcessors() == null) {
+ return null;
+ }
+
+ return group.getProcessors().stream()
+ .filter(processor -> processor.getName().equals(processorName))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private ControllerServiceAPI createServiceApi(final String type, final
String group, final String artifact, final String version) {
+ final ControllerServiceAPI serviceAPI = new ControllerServiceAPI();
+ serviceAPI.setType(type);
+ serviceAPI.setBundle(new Bundle(group, artifact, version));
+ return serviceAPI;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/standard-external-service-reference/convert-record-external-schema-registry.json
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/standard-external-service-reference/convert-record-external-schema-registry.json
new file mode 100644
index 0000000000..372493850b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/standard-external-service-reference/convert-record-external-schema-registry.json
@@ -0,0 +1,321 @@
+{
+ "externalControllerServices": {
+ "8a67de89-f5e7-3e42-892b-cfddad74c496": {
+ "identifier": "8a67de89-f5e7-3e42-892b-cfddad74c496",
+ "name": "AvroSchemaRegistry"
+ }
+ },
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [],
+ "controllerServices": [
+ {
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-record-serialization-services-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "CONTROLLER_SERVICE",
+ "controllerServiceApis": [
+ {
+ "bundle": {
+ "artifact": "nifi-standard-services-api-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "type": "org.apache.nifi.serialization.RecordReaderFactory"
+ }
+ ],
+ "groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
+ "identifier": "3e1ca514-b79e-3b6a-acec-4efb66b8d922",
+ "instanceIdentifier": "c9a88942-0187-1000-dfb0-14f70307d4f8",
+ "name": "AvroReader",
+ "properties": {
+ "schema-name": "${schema.name}",
+ "cache-size": "1000",
+ "schema-registry": "8a67de89-f5e7-3e42-892b-cfddad74c496",
+ "schema-access-strategy": "schema-name",
+ "schema-text": "${avro.schema}"
+ },
+ "propertyDescriptors": {
+ "schema-branch": {
+ "displayName": "Schema Branch",
+ "identifiesControllerService": false,
+ "name": "schema-branch",
+ "sensitive": false
+ },
+ "schema-name": {
+ "displayName": "Schema Name",
+ "identifiesControllerService": false,
+ "name": "schema-name",
+ "sensitive": false
+ },
+ "cache-size": {
+ "displayName": "Cache Size",
+ "identifiesControllerService": false,
+ "name": "cache-size",
+ "sensitive": false
+ },
+ "schema-registry": {
+ "displayName": "Schema Registry",
+ "identifiesControllerService": true,
+ "name": "schema-registry",
+ "sensitive": false
+ },
+ "schema-access-strategy": {
+ "displayName": "Schema Access Strategy",
+ "identifiesControllerService": false,
+ "name": "schema-access-strategy",
+ "sensitive": false
+ },
+ "schema-version": {
+ "displayName": "Schema Version",
+ "identifiesControllerService": false,
+ "name": "schema-version",
+ "sensitive": false
+ },
+ "schema-text": {
+ "displayName": "Schema Text",
+ "identifiesControllerService": false,
+ "name": "schema-text",
+ "sensitive": false
+ }
+ },
+ "scheduledState": "DISABLED",
+ "type": "org.apache.nifi.avro.AvroReader"
+ },
+ {
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-record-serialization-services-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "componentType": "CONTROLLER_SERVICE",
+ "controllerServiceApis": [
+ {
+ "bundle": {
+ "artifact": "nifi-standard-services-api-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "type": "org.apache.nifi.serialization.RecordSetWriterFactory"
+ }
+ ],
+ "groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
+ "identifier": "77c4ccd4-4dc1-3a2a-8fd3-6b9c020ef6dc",
+ "instanceIdentifier": "c9a912c4-0187-1000-b58b-295f675c6fc6",
+ "name": "JsonRecordSetWriter",
+ "properties": {
+ "compression-level": "1",
+ "Pretty Print JSON": "false",
+ "compression-format": "none",
+ "Schema Write Strategy": "no-schema",
+ "suppress-nulls": "never-suppress",
+ "output-grouping": "output-array",
+ "schema-name": "${schema.name}",
+ "schema-access-strategy": "inherit-record-schema",
+ "schema-protocol-version": "1",
+ "schema-text": "${avro.schema}"
+ },
+ "propertyDescriptors": {
+ "schema-branch": {
+ "displayName": "Schema Branch",
+ "identifiesControllerService": false,
+ "name": "schema-branch",
+ "sensitive": false
+ },
+ "compression-level": {
+ "displayName": "Compression Level",
+ "identifiesControllerService": false,
+ "name": "compression-level",
+ "sensitive": false
+ },
+ "schema-cache": {
+ "displayName": "Schema Cache",
+ "identifiesControllerService": true,
+ "name": "schema-cache",
+ "sensitive": false
+ },
+ "Timestamp Format": {
+ "displayName": "Timestamp Format",
+ "identifiesControllerService": false,
+ "name": "Timestamp Format",
+ "sensitive": false
+ },
+ "Date Format": {
+ "displayName": "Date Format",
+ "identifiesControllerService": false,
+ "name": "Date Format",
+ "sensitive": false
+ },
+ "Pretty Print JSON": {
+ "displayName": "Pretty Print JSON",
+ "identifiesControllerService": false,
+ "name": "Pretty Print JSON",
+ "sensitive": false
+ },
+ "compression-format": {
+ "displayName": "Compression Format",
+ "identifiesControllerService": false,
+ "name": "compression-format",
+ "sensitive": false
+ },
+ "Schema Write Strategy": {
+ "displayName": "Schema Write Strategy",
+ "identifiesControllerService": false,
+ "name": "Schema Write Strategy",
+ "sensitive": false
+ },
+ "suppress-nulls": {
+ "displayName": "Suppress Null Values",
+ "identifiesControllerService": false,
+ "name": "suppress-nulls",
+ "sensitive": false
+ },
+ "output-grouping": {
+ "displayName": "Output Grouping",
+ "identifiesControllerService": false,
+ "name": "output-grouping",
+ "sensitive": false
+ },
+ "schema-name": {
+ "displayName": "Schema Name",
+ "identifiesControllerService": false,
+ "name": "schema-name",
+ "sensitive": false
+ },
+ "schema-registry": {
+ "displayName": "Schema Registry",
+ "identifiesControllerService": true,
+ "name": "schema-registry",
+ "sensitive": false
+ },
+ "Time Format": {
+ "displayName": "Time Format",
+ "identifiesControllerService": false,
+ "name": "Time Format",
+ "sensitive": false
+ },
+ "schema-access-strategy": {
+ "displayName": "Schema Access Strategy",
+ "identifiesControllerService": false,
+ "name": "schema-access-strategy",
+ "sensitive": false
+ },
+ "schema-protocol-version": {
+ "displayName": "Schema Protocol Version",
+ "identifiesControllerService": false,
+ "name": "schema-protocol-version",
+ "sensitive": false
+ },
+ "schema-version": {
+ "displayName": "Schema Version",
+ "identifiesControllerService": false,
+ "name": "schema-version",
+ "sensitive": false
+ },
+ "schema-text": {
+ "displayName": "Schema Text",
+ "identifiesControllerService": false,
+ "name": "schema-text",
+ "sensitive": false
+ }
+ },
+ "scheduledState": "DISABLED",
+ "type": "org.apache.nifi.json.JsonRecordSetWriter"
+ }
+ ],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
+ "inputPorts": [],
+ "instanceIdentifier": "c9a82721-0187-1000-d065-cc6f16cb06d7",
+ "labels": [],
+ "name": "Convert Record - External Schema Registry",
+ "outputPorts": [],
+ "position": {
+ "x": 312.0,
+ "y": 222.0
+ },
+ "processGroups": [],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-standard-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
+ "identifier": "38cde0b4-b4b3-3481-9b90-504f9386022c",
+ "instanceIdentifier": "c9a84b1c-0187-1000-c999-1b1b1cba8843",
+ "maxBackoffPeriod": "10 mins",
+ "name": "ConvertRecord",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 427.0,
+ "y": 288.0
+ },
+ "properties": {
+ "record-writer": "77c4ccd4-4dc1-3a2a-8fd3-6b9c020ef6dc",
+ "record-reader": "3e1ca514-b79e-3b6a-acec-4efb66b8d922",
+ "include-zero-record-flowfiles": "true"
+ },
+ "propertyDescriptors": {
+ "record-writer": {
+ "displayName": "Record Writer",
+ "identifiesControllerService": true,
+ "name": "record-writer",
+ "sensitive": false
+ },
+ "record-reader": {
+ "displayName": "Record Reader",
+ "identifiesControllerService": true,
+ "name": "record-reader",
+ "sensitive": false
+ },
+ "include-zero-record-flowfiles": {
+ "displayName": "Include Zero Record FlowFiles",
+ "identifiesControllerService": false,
+ "name": "include-zero-record-flowfiles",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.standard.ConvertRecord",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "parameterProviders": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1682715543022,
+ "version": 1
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/child.json
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/child.json
new file mode 100644
index 0000000000..2b35c97a8c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/child.json
@@ -0,0 +1,230 @@
+{
+ "externalControllerServices": {
+ "e314fb24-91a4-38c5-af94-520483cf0579": {
+ "identifier": "e314fb24-91a4-38c5-af94-520483cf0579",
+ "name": "MyAvroRecordSetWriter"
+ },
+ "9a674922-376f-367c-9dc3-095f8014223e": {
+ "identifier": "9a674922-376f-367c-9dc3-095f8014223e",
+ "name": "MyAvroReader"
+ }
+ },
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "id": "afe169ce-aada-3055-9489-cd40bde6789d",
+ "instanceIdentifier": "1117d8ab-8613-3675-83ef-fb6f30b891cb",
+ "name": "ConvertRecord",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "identifier": "bab085dc-1e3d-3ff4-8c98-61e2ac2da897",
+ "instanceIdentifier": "b13d9099-7f12-373c-8900-c653d9a5acf0",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "success"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "id": "c84ea0f1-31d6-3c66-a431-526a13ecd953",
+ "instanceIdentifier": "0c16178a-f864-3b60-11e4-9b5f6d8af199",
+ "name": "GenerateFlowFile",
+ "type": "PROCESSOR"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "inputPorts": [],
+ "instanceIdentifier": "a44c6370-0187-1000-d0cf-e5bc93fd74d3",
+ "labels": [],
+ "name": "Child",
+ "outputPorts": [],
+ "position": {
+ "x": 502.0,
+ "y": 283.0
+ },
+ "processGroups": [],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-standard-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "identifier": "c84ea0f1-31d6-3c66-a431-526a13ecd953",
+ "instanceIdentifier": "0c16178a-f864-3b60-11e4-9b5f6d8af199",
+ "maxBackoffPeriod": "10 mins",
+ "name": "GenerateFlowFile",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 432.0,
+ "y": 64.0
+ },
+ "properties": {
+ "character-set": "UTF-8",
+ "File Size": "0B",
+ "Batch Size": "1",
+ "Unique FlowFiles": "false",
+ "Data Format": "Text"
+ },
+ "propertyDescriptors": {
+ "character-set": {
+ "displayName": "Character Set",
+ "identifiesControllerService": false,
+ "name": "character-set",
+ "sensitive": false
+ },
+ "File Size": {
+ "displayName": "File Size",
+ "identifiesControllerService": false,
+ "name": "File Size",
+ "sensitive": false
+ },
+ "mime-type": {
+ "displayName": "Mime Type",
+ "identifiesControllerService": false,
+ "name": "mime-type",
+ "sensitive": false
+ },
+ "generate-ff-custom-text": {
+ "displayName": "Custom Text",
+ "identifiesControllerService": false,
+ "name": "generate-ff-custom-text",
+ "sensitive": false
+ },
+ "Batch Size": {
+ "displayName": "Batch Size",
+ "identifiesControllerService": false,
+ "name": "Batch Size",
+ "sensitive": false
+ },
+ "Unique FlowFiles": {
+ "displayName": "Unique FlowFiles",
+ "identifiesControllerService": false,
+ "name": "Unique FlowFiles",
+ "sensitive": false
+ },
+ "Data Format": {
+ "displayName": "Data Format",
+ "identifiesControllerService": false,
+ "name": "Data Format",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "1 min",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
+ "yieldDuration": "1 sec"
+ },
+ {
+ "autoTerminatedRelationships": [
+ "success",
+ "failure"
+ ],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-standard-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
+ "identifier": "afe169ce-aada-3055-9489-cd40bde6789d",
+ "instanceIdentifier": "1117d8ab-8613-3675-83ef-fb6f30b891cb",
+ "maxBackoffPeriod": "10 mins",
+ "name": "ConvertRecord",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 416.0,
+ "y": 312.0
+ },
+ "properties": {
+ "record-writer": "e314fb24-91a4-38c5-af94-520483cf0579",
+ "record-reader": "9a674922-376f-367c-9dc3-095f8014223e",
+ "include-zero-record-flowfiles": "true"
+ },
+ "propertyDescriptors": {
+ "record-writer": {
+ "displayName": "Record Writer",
+ "identifiesControllerService": true,
+ "name": "record-writer",
+ "sensitive": false
+ },
+ "record-reader": {
+ "displayName": "Record Reader",
+ "identifiesControllerService": true,
+ "name": "record-reader",
+ "sensitive": false
+ },
+ "include-zero-record-flowfiles": {
+ "displayName": "Include Zero Record FlowFiles",
+ "identifiesControllerService": false,
+ "name": "include-zero-record-flowfiles",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.standard.ConvertRecord",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "parameterProviders": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1682088715936,
+ "version": 2
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/parent.json
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/parent.json
new file mode 100644
index 0000000000..6ce6304a37
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/resources/snapshots/versioned-child-services-from-parent/parent.json
@@ -0,0 +1,260 @@
+{
+ "externalControllerServices": {},
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [],
+ "controllerServices": [
+ {
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-record-serialization-services-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "CONTROLLER_SERVICE",
+ "controllerServiceApis": [
+ {
+ "bundle": {
+ "artifact": "nifi-standard-services-api-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "type": "org.apache.nifi.serialization.RecordSetWriterFactory"
+ }
+ ],
+ "groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
+ "identifier": "faddf543-c1f4-3c9c-ab99-dc17a7922e2b",
+ "instanceIdentifier": "a44aed77-0187-1000-ce63-3170a105bc81",
+ "name": "MyAvroRecordSetWriter",
+ "properties": {
+ "compression-format": "NONE",
+ "Schema Write Strategy": "avro-embedded",
+ "schema-name": "${schema.name}",
+ "cache-size": "1000",
+ "schema-access-strategy": "inherit-record-schema",
+ "schema-protocol-version": "1",
+ "encoder-pool-size": "32",
+ "schema-text": "${avro.schema}"
+ },
+ "propertyDescriptors": {
+ "compression-format": {
+ "displayName": "Compression Format",
+ "identifiesControllerService": false,
+ "name": "compression-format",
+ "sensitive": false
+ },
+ "Schema Write Strategy": {
+ "displayName": "Schema Write Strategy",
+ "identifiesControllerService": false,
+ "name": "Schema Write Strategy",
+ "sensitive": false
+ },
+ "schema-branch": {
+ "displayName": "Schema Branch",
+ "identifiesControllerService": false,
+ "name": "schema-branch",
+ "sensitive": false
+ },
+ "schema-name": {
+ "displayName": "Schema Name",
+ "identifiesControllerService": false,
+ "name": "schema-name",
+ "sensitive": false
+ },
+ "cache-size": {
+ "displayName": "Cache Size",
+ "identifiesControllerService": false,
+ "name": "cache-size",
+ "sensitive": false
+ },
+ "schema-registry": {
+ "displayName": "Schema Registry",
+ "identifiesControllerService": true,
+ "name": "schema-registry",
+ "sensitive": false
+ },
+ "schema-access-strategy": {
+ "displayName": "Schema Access Strategy",
+ "identifiesControllerService": false,
+ "name": "schema-access-strategy",
+ "sensitive": false
+ },
+ "schema-protocol-version": {
+ "displayName": "Schema Protocol Version",
+ "identifiesControllerService": false,
+ "name": "schema-protocol-version",
+ "sensitive": false
+ },
+ "schema-version": {
+ "displayName": "Schema Version",
+ "identifiesControllerService": false,
+ "name": "schema-version",
+ "sensitive": false
+ },
+ "encoder-pool-size": {
+ "displayName": "Encoder Pool Size",
+ "identifiesControllerService": false,
+ "name": "encoder-pool-size",
+ "sensitive": false
+ },
+ "schema-cache": {
+ "displayName": "Schema Cache",
+ "identifiesControllerService": true,
+ "name": "schema-cache",
+ "sensitive": false
+ },
+ "schema-text": {
+ "displayName": "Schema Text",
+ "identifiesControllerService": false,
+ "name": "schema-text",
+ "sensitive": false
+ }
+ },
+ "scheduledState": "DISABLED",
+ "type": "org.apache.nifi.avro.AvroRecordSetWriter"
+ },
+ {
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-record-serialization-services-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "comments": "",
+ "componentType": "CONTROLLER_SERVICE",
+ "controllerServiceApis": [
+ {
+ "bundle": {
+ "artifact": "nifi-standard-services-api-nar",
+ "group": "org.apache.nifi",
+ "version": "2.0.0-SNAPSHOT"
+ },
+ "type": "org.apache.nifi.serialization.RecordReaderFactory"
+ }
+ ],
+ "groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
+ "identifier": "317bbb66-135a-398a-8101-976d14ae0882",
+ "instanceIdentifier": "a44ae24d-0187-1000-cf90-4d00cd5b71dc",
+ "name": "MyAvroReader",
+ "properties": {
+ "schema-name": "${schema.name}",
+ "cache-size": "1000",
+ "schema-access-strategy": "embedded-avro-schema",
+ "schema-text": "${avro.schema}"
+ },
+ "propertyDescriptors": {
+ "schema-branch": {
+ "displayName": "Schema Branch",
+ "identifiesControllerService": false,
+ "name": "schema-branch",
+ "sensitive": false
+ },
+ "schema-name": {
+ "displayName": "Schema Name",
+ "identifiesControllerService": false,
+ "name": "schema-name",
+ "sensitive": false
+ },
+ "cache-size": {
+ "displayName": "Cache Size",
+ "identifiesControllerService": false,
+ "name": "cache-size",
+ "sensitive": false
+ },
+ "schema-registry": {
+ "displayName": "Schema Registry",
+ "identifiesControllerService": true,
+ "name": "schema-registry",
+ "sensitive": false
+ },
+ "schema-access-strategy": {
+ "displayName": "Schema Access Strategy",
+ "identifiesControllerService": false,
+ "name": "schema-access-strategy",
+ "sensitive": false
+ },
+ "schema-version": {
+ "displayName": "Schema Version",
+ "identifiesControllerService": false,
+ "name": "schema-version",
+ "sensitive": false
+ },
+ "schema-text": {
+ "displayName": "Schema Text",
+ "identifiesControllerService": false,
+ "name": "schema-text",
+ "sensitive": false
+ }
+ },
+ "scheduledState": "DISABLED",
+ "type": "org.apache.nifi.avro.AvroReader"
+ }
+ ],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
+ "inputPorts": [],
+ "instanceIdentifier": "a44ac702-0187-1000-a1cd-1c10c796f849",
+ "labels": [],
+ "name": "Parent",
+ "outputPorts": [],
+ "position": {
+ "x": 348.0,
+ "y": 161.0
+ },
+ "processGroups": [
+ {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
+ "identifier": "c1976be6-e5d7-31b5-aaa8-64a44810c9d0",
+ "inputPorts": [],
+ "instanceIdentifier": "a44b3017-0187-1000-6f44-3881b5ac4217",
+ "labels": [],
+ "name": "Child",
+ "outputPorts": [],
+ "position": {
+ "x": 433.0,
+ "y": 266.0
+ },
+ "processGroups": [],
+ "processors": [],
+ "remoteProcessGroups": [],
+ "variables": {},
+ "versionedFlowCoordinates": {
+ "bucketId": "6e767fa6-5113-46f0-9b7e-8024b75a40c1",
+ "flowId": "2192834e-60d7-413b-9ea9-dc4187824a41",
+ "registryUrl": "http://localhost:18080",
+ "storageLocation":
"http://localhost:18080/nifi-registry-api/buckets/6e767fa6-5113-46f0-9b7e-8024b75a40c1/flows/2192834e-60d7-413b-9ea9-dc4187824a41/versions/2",
+ "version": 2
+ }
+ }
+ ],
+ "processors": [],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "parameterProviders": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1682088790173,
+ "version": 2
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceApiLookup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceApiLookup.java
new file mode 100644
index 0000000000..d11b033b31
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceApiLookup.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+
+import java.util.Map;
+
+public interface ControllerServiceApiLookup {
+
+ /**
+ * Returns the required service APIs for any property descriptors in the
component specified by the
+ * given type and bundle. The key of the returned Map is the name of the
PropertyDescriptor.
+ *
+ * @param type a component type
+ * @param bundle a component bundle
+ * @return the required services APIs for the component's property
descriptors, or empty Map if no compatible
+ * bundle could be found, or if the component doesn't require
any service APIs
+ */
+ Map<String, ControllerServiceAPI> getRequiredServiceApis(String type,
Bundle bundle);
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceResolver.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceResolver.java
new file mode 100644
index 0000000000..dc7cb81315
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceResolver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
+
+public interface ControllerServiceResolver {
+
+ /**
+ * Resolves controller service references in any processors or controller
services that exist in the flow contents
+ * of the top level snapshot provided in the flow snapshot container.
+ *
+ * The resolution looks for a service in the same group, or ancestor
group, that provides the required API and has
+ * the same name as referenced in the original snapshot. If only one such
service exists, it is selected and the
+ * value of the component's property descriptor is updated. If more than
on service exists, the value of the
+ * component's property descriptor is not modified.
+ *
+ * @param flowSnapshotContainer the container encapsulating the top level
snapshot being imported, as well as the
+ * snapshots of any child groups that are
also under version control
+ * @param parentGroupId the id of the process group where the snapshot is
being imported
+ * @param user the user performing the import
+ */
+ void resolveInheritedControllerServices(FlowSnapshotContainer
flowSnapshotContainer, String parentGroupId, NiFiUser user);
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
index 03d30409f4..8fe50f2d4e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
@@ -44,7 +44,7 @@ public interface FlowRegistryClientNode extends ComponentNode
{
RegisteredFlow getFlow(FlowRegistryClientUserContext context, String
bucketId, String flowId) throws FlowRegistryException, IOException;
Set<RegisteredFlow> getFlows(FlowRegistryClientUserContext context, String
bucketId) throws FlowRegistryException, IOException;
- RegisteredFlowSnapshot getFlowContents(FlowRegistryClientUserContext
context, String bucketId, String flowId, int version, boolean fetchRemoteFlows)
throws FlowRegistryException, IOException;
+ FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext
context, String bucketId, String flowId, int version, boolean fetchRemoteFlows)
throws FlowRegistryException, IOException;
RegisteredFlowSnapshot registerFlowSnapshot(
FlowRegistryClientUserContext context,
RegisteredFlow flow,
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContainer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContainer.java
new file mode 100644
index 0000000000..3c2d075b6f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContainer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.flow.ParameterProviderReference;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Holds the results of recursively fetching the contents of a registered flow
snapshot where some child groups may
+ * also be under version control. The top level flow snapshot will have the
contents of all the child groups populated,
+ * and the child snapshots here provide a mechanism to obtain the
corresponding metadata for a versioned controlled
+ * child group, such as the external service references, etc.
+ */
+public class FlowSnapshotContainer {
+
+ private final RegisteredFlowSnapshot flowSnapshot;
+
+ /**
+ * The key of this Map is the id of the corresponding
VersionedProcessGroup within the contents of the top level snapshot
+ * (i.e. within flowSnapshot.getFlowContents()). Any child process group
under version control will have an entry in this Map.
+ */
+ private final Map<String, RegisteredFlowSnapshot> childSnapshotsByGroupId;
+
+ public FlowSnapshotContainer(final RegisteredFlowSnapshot flowSnapshot) {
+ this.flowSnapshot = Objects.requireNonNull(flowSnapshot);
+ if (this.flowSnapshot.getParameterContexts() == null) {
+ flowSnapshot.setParameterContexts(new HashMap<>());
+ }
+ if (this.flowSnapshot.getParameterProviders() == null) {
+ flowSnapshot.setParameterProviders(new HashMap<>());
+ }
+ this.childSnapshotsByGroupId = new HashMap<>();
+ }
+
+ /**
+ * @return the top level snapshot
+ */
+ public RegisteredFlowSnapshot getFlowSnapshot() {
+ return flowSnapshot;
+ }
+
+ /**
+ * Get the snapshot that was used to populate the given group in the top
level snapshot.
+ *
+ * @param groupId the id of a versioned controlled group within the top
level snapshot
+ * @return the snapshot used to populate that group
+ */
+ public RegisteredFlowSnapshot getChildSnapshot(final String groupId) {
+ return childSnapshotsByGroupId.get(groupId);
+ }
+
+ /**
+ * Adds a child snapshot to the container.
+ *
+ * @param childSnapshot the snapshot for the inner child PG
+ * @param destinationGroup the VersionedProcessGroup in the top level
snapshot where the child exists
+ */
+ public void addChildSnapshot(final RegisteredFlowSnapshot childSnapshot,
final VersionedProcessGroup destinationGroup) {
+ // We need to use the id of the group that the snapshot is being
copied into because that is how this Map
+ // will be accessed later, the id from the child snapshot's flow
contents group is not the same
+ childSnapshotsByGroupId.put(destinationGroup.getIdentifier(),
childSnapshot);
+
+ // Merge any parameter contexts and parameter providers from the child
into the top level snapshot
+ mergeParameterContexts(childSnapshot);
+ mergeParameterProviders(childSnapshot);
+ }
+
+ /**
+ * For each parameter context in the child snapshot:
+ * - Check if the top level snapshot has a parameter context with the
same name
+ * - If a context with the same name does not exist, then add the context
+ * - If a context with the same name does exist, then add any parameters
that don't already exist in the context
+ */
+ private void mergeParameterContexts(final RegisteredFlowSnapshot
childSnapshot) {
+ final Map<String, VersionedParameterContext> childContexts =
childSnapshot.getParameterContexts();
+ if (childContexts == null) {
+ return;
+ }
+
+ for (final Map.Entry<String, VersionedParameterContext>
childContextEntry : childContexts.entrySet()) {
+ final String childContextName = childContextEntry.getKey();
+ final VersionedParameterContext childContext =
childContextEntry.getValue();
+
+ final VersionedParameterContext matchingContext =
flowSnapshot.getParameterContexts().get(childContextName);
+ if (matchingContext == null) {
+ flowSnapshot.getParameterContexts().put(childContextName,
childContext);
+ } else {
+ if (matchingContext.getParameters() == null) {
+ matchingContext.setParameters(new HashSet<>());
+ }
+ final Set<VersionedParameter> childParameters =
childContext.getParameters() == null ? Collections.emptySet() :
childContext.getParameters();
+ for (final VersionedParameter childParameter :
childParameters) {
+ if
(!matchingContext.getParameters().contains(childParameter)) {
+ matchingContext.getParameters().add(childParameter);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * For each parameter provider reference in the child snapshot:
+ * - Check if the top level snapshot has a parameter provider reference
with the same id
+ * - If a provider reference does not exist with the same id, then add
the provider reference
+ */
+ private void mergeParameterProviders(final RegisteredFlowSnapshot
childSnapshot) {
+ final Map<String, ParameterProviderReference> childParamProviders =
childSnapshot.getParameterProviders();
+ if (childParamProviders == null) {
+ return;
+ }
+
+ for (final Map.Entry<String, ParameterProviderReference>
childProviderEntry : childParamProviders.entrySet()) {
+ final String childProviderId = childProviderEntry.getKey();
+ final ParameterProviderReference childProvider =
childProviderEntry.getValue();
+ flowSnapshot.getParameterProviders().putIfAbsent(childProviderId,
childProvider);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/registry/flow/FlowSnapshotContainerTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/registry/flow/FlowSnapshotContainerTest.java
new file mode 100644
index 0000000000..c20aa60d85
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/registry/flow/FlowSnapshotContainerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.flow.ParameterProviderReference;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FlowSnapshotContainerTest {
+
+ public static final String CONTEXT_1_NAME = "Context 1";
+ public static final String CONTEXT_2_NAME = "Context 2";
+
+ public static final String CONTEXT_1_PARAM_A_NAME = "Context 1 - Param A";
+ public static final String CONTEXT_1_PARAM_B_NAME = "Context 1 - Param B";
+ public static final String CONTEXT_1_PARAM_C_NAME = "Context 1 - Param C";
+ public static final String CONTEXT_2_PARAM_D_NAME = "Context 2 - Param D";
+
+ public static final String PROVIDER_1_ID = "provider1";
+ public static final String PROVIDER_2_ID = "provider2";
+ public static final String PROVIDER_3_ID = "provider3";
+
+ private FlowSnapshotContainer flowSnapshotContainer;
+
+ @BeforeEach
+ public void setup() {
+ final VersionedParameter parameterA =
createParameter(CONTEXT_1_PARAM_A_NAME);
+ final VersionedParameter parameterB =
createParameter(CONTEXT_1_PARAM_B_NAME);
+ final VersionedParameterContext context1 =
createContext(CONTEXT_1_NAME, parameterA, parameterB);
+
+ final Map<String, VersionedParameterContext> topLevelParamContexts =
new HashMap<>();
+ topLevelParamContexts.put(context1.getName(), context1);
+
+ final ParameterProviderReference providerReference1 =
createProviderReference(PROVIDER_1_ID, "Provider 1");
+ final ParameterProviderReference providerReference2 =
createProviderReference(PROVIDER_2_ID, "Provider 2");
+
+ final Map<String, ParameterProviderReference>
topLevelProviderReferences = new HashMap<>();
+ topLevelProviderReferences.put(providerReference1.getIdentifier(),
providerReference1);
+ topLevelProviderReferences.put(providerReference2.getIdentifier(),
providerReference2);
+
+ final RegisteredFlowSnapshot topLevelSnapshot = new
RegisteredFlowSnapshot();
+ topLevelSnapshot.setFlowContents(new VersionedProcessGroup());
+ topLevelSnapshot.setParameterContexts(topLevelParamContexts);
+ topLevelSnapshot.setParameterProviders(topLevelProviderReferences);
+
+ flowSnapshotContainer = new FlowSnapshotContainer(topLevelSnapshot);
+ }
+
+ @Test
+ public void testAddChildSnapshot() {
+ // Child has same context with an additional parameter
+ final VersionedParameter parameterC =
createParameter(CONTEXT_1_PARAM_C_NAME);
+ final VersionedParameterContext childContext1 =
createContext(CONTEXT_1_NAME, parameterC);
+
+ // Child has an additional context that didn't exist in parent
+ final VersionedParameter parameterD =
createParameter(CONTEXT_2_PARAM_D_NAME);
+ final VersionedParameterContext childContext2 =
createContext(CONTEXT_2_NAME, parameterD);
+
+ final Map<String, VersionedParameterContext> childParamContexts = new
HashMap<>();
+ childParamContexts.put(childContext1.getName(), childContext1);
+ childParamContexts.put(childContext2.getName(), childContext2);
+
+ // Child has additional provider reference
+ final ParameterProviderReference providerReference3 =
createProviderReference(PROVIDER_3_ID, "Provider 3");
+
+ final Map<String, ParameterProviderReference> childProviderReferences
= new HashMap<>();
+ childProviderReferences.put(providerReference3.getIdentifier(),
providerReference3);
+
+ // Setup child snapshot
+ final RegisteredFlowSnapshot childSnapshot = new
RegisteredFlowSnapshot();
+ childSnapshot.setFlowContents(new VersionedProcessGroup());
+ childSnapshot.setParameterContexts(childParamContexts);
+ childSnapshot.setParameterProviders(childProviderReferences);
+
+ // Add to child to container
+ final VersionedProcessGroup destGroup = new VersionedProcessGroup();
+ destGroup.setIdentifier(UUID.randomUUID().toString());
+ flowSnapshotContainer.addChildSnapshot(childSnapshot, destGroup);
+
+ // Verify get by group id
+ final RegisteredFlowSnapshot retrievedChildSnapshot =
flowSnapshotContainer.getChildSnapshot(destGroup.getIdentifier());
+ assertEquals(childSnapshot, retrievedChildSnapshot);
+
+ // Verify additional context was added
+ final Map<String, VersionedParameterContext> topLevelParamContexts =
flowSnapshotContainer.getFlowSnapshot().getParameterContexts();
+ assertEquals(2, topLevelParamContexts.size());
+ assertTrue(topLevelParamContexts.containsKey(CONTEXT_1_NAME));
+ assertTrue(topLevelParamContexts.containsKey(CONTEXT_2_NAME));
+
+ // Verify additional parameters added to context 1
+ final VersionedParameterContext context1 =
topLevelParamContexts.get(CONTEXT_1_NAME);
+ final Set<VersionedParameter> context1Parameters =
context1.getParameters();
+ assertEquals(3, context1Parameters.size());
+ assertNotNull(context1Parameters.stream().filter(p ->
p.getName().equals(CONTEXT_1_PARAM_A_NAME)).findFirst().orElse(null));
+ assertNotNull(context1Parameters.stream().filter(p ->
p.getName().equals(CONTEXT_1_PARAM_B_NAME)).findFirst().orElse(null));
+ assertNotNull(context1Parameters.stream().filter(p ->
p.getName().equals(CONTEXT_1_PARAM_C_NAME)).findFirst().orElse(null));
+
+ // Verify additional provider added
+ final Map<String, ParameterProviderReference> topLevelProviders =
flowSnapshotContainer.getFlowSnapshot().getParameterProviders();
+ assertEquals(3, topLevelProviders.size());
+ assertTrue(topLevelProviders.containsKey(PROVIDER_1_ID));
+ assertTrue(topLevelProviders.containsKey(PROVIDER_2_ID));
+ assertTrue(topLevelProviders.containsKey(PROVIDER_3_ID));
+ }
+
+ private VersionedParameter createParameter(final String name) {
+ final VersionedParameter parameter = new VersionedParameter();
+ parameter.setName(name);
+ return parameter;
+ }
+
+ private VersionedParameterContext createContext(final String name, final
VersionedParameter ... parameters) {
+ final VersionedParameterContext paramContext = new
VersionedParameterContext();
+ paramContext.setName(name);
+ paramContext.setParameters(new HashSet<>(Arrays.asList(parameters)));
+ return paramContext;
+ }
+
+ private ParameterProviderReference createProviderReference(final String
id, final String name) {
+ final ParameterProviderReference reference = new
ParameterProviderReference();
+ reference.setIdentifier(id);
+ reference.setName(name);
+ return reference;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b93aef24d3..86987447ca 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -113,8 +113,11 @@ import
org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceResolver;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceResolver;
+import org.apache.nifi.controller.service.StandardControllerServiceApiLookup;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.NodeStatus;
@@ -167,6 +170,7 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -278,6 +282,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
+ private final StandardControllerServiceResolver controllerServiceResolver;
private final Authorizer authorizer;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
@@ -544,6 +549,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
flowManager = new StandardFlowManager(nifiProperties, sslContext,
this, flowFileEventRepository, parameterContextManager);
controllerServiceProvider = new
StandardControllerServiceProvider(processScheduler, bulletinRepository,
flowManager, extensionManager);
+ controllerServiceResolver = new
StandardControllerServiceResolver(authorizer, flowManager, new
NiFiRegistryFlowMapper(extensionManager),
+ controllerServiceProvider, new
StandardControllerServiceApiLookup(extensionManager));
flowManager.initialize(controllerServiceProvider);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
@@ -2094,6 +2101,9 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
return controllerServiceProvider;
}
+ public ControllerServiceResolver getControllerServiceResolver() {
+ return controllerServiceResolver;
+ }
public VariableRegistry getVariableRegistry() {
return variableRegistry;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index fa4cd52079..2467f46b9e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -32,6 +32,7 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.parameter.ParameterGroupConfiguration;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.flow.VersionedParameterContext;
@@ -1636,7 +1637,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could
not be found
*/
- RegisteredFlowSnapshot
getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo,
boolean fetchRemoteFlows);
+ FlowSnapshotContainer
getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo,
boolean fetchRemoteFlows);
/**
* Get the latest Versioned Flow Snapshot from the registry for the
Process Group with the given ID
@@ -1646,7 +1647,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could
not be found
*/
- RegisteredFlowSnapshot getVersionedFlowSnapshotByGroupId(String
processGroupId);
+ FlowSnapshotContainer getVersionedFlowSnapshotByGroupId(String
processGroupId);
/**
* Get the current state of the Process Group with the given ID, converted
to a Versioned Flow Snapshot
@@ -2700,11 +2701,11 @@ public interface NiFiServiceFacade {
* For any Controller Service that is found in the given Versioned Process
Group, if that Controller Service is not itself included in the Versioned
Process Groups,
* attempts to find an existing Controller Service that matches the
definition. If any is found, the component within the Versioned Process Group
is updated to point
* to the existing service.
- * @param versionedFlowSnapshot the flow snapshot
+ * @param flowSnapshotContainer the flow snapshot container
* @param parentGroupId the ID of the Process Group from which the
Controller Services are inherited
* @param user the NiFi user on whose behalf the request is happening;
this user is used for validation so that only the Controller Services that the
user has READ permissions to are included
*/
- void resolveInheritedControllerServices(RegisteredFlowSnapshot
versionedFlowSnapshot, String parentGroupId, NiFiUser user);
+ void resolveInheritedControllerServices(FlowSnapshotContainer
flowSnapshotContainer, String parentGroupId, NiFiUser user);
/**
* For any Parameter Provider that is found in the given Versioned Process
Group, attempts to find an existing Parameter Provider that matches the
definition. If any is found,
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index cc64d0047f..c31540beff 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -71,7 +71,6 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
-import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ParameterProviderNode;
@@ -103,7 +102,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedComponent;
-import org.apache.nifi.flow.VersionedConfigurableComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
@@ -111,8 +109,6 @@ import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -144,6 +140,7 @@ import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryPermissions;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@@ -4061,124 +4058,8 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void resolveInheritedControllerServices(final
RegisteredFlowSnapshot versionedFlowSnapshot, final String processGroupId,
final NiFiUser user) {
- final VersionedProcessGroup versionedGroup =
versionedFlowSnapshot.getFlowContents();
- resolveInheritedControllerServices(versionedGroup, processGroupId,
versionedFlowSnapshot.getExternalControllerServices(), user);
- }
-
- private void resolveInheritedControllerServices(final
VersionedProcessGroup versionedGroup, final String processGroupId,
- final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences,
- final NiFiUser user) {
- final Set<String> availableControllerServiceIds =
findAllControllerServiceIds(versionedGroup);
- final ProcessGroup parentGroup =
processGroupDAO.getProcessGroup(processGroupId);
- final Set<ControllerServiceNode> serviceNodes =
parentGroup.getControllerServices(true).stream()
- .filter(service -> service.isAuthorized(authorizer,
RequestAction.READ, user))
- .collect(Collectors.toSet());
-
- final ExtensionManager extensionManager =
controllerFacade.getExtensionManager();
- for (final VersionedProcessor processor :
versionedGroup.getProcessors()) {
- final Optional<BundleCoordinate> compatibleBundle =
BundleUtils.getOptionalCompatibleBundle(extensionManager, processor.getType(),
BundleUtils.createBundleDto(processor.getBundle()));
- if (compatibleBundle.isPresent()) {
- final ConfigurableComponent tempComponent =
extensionManager.getTempComponent(processor.getType(), compatibleBundle.get());
- resolveInheritedControllerServices(processor,
availableControllerServiceIds, serviceNodes,
externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
- }
- }
-
- for (final VersionedControllerService service :
versionedGroup.getControllerServices()) {
- final Optional<BundleCoordinate> compatibleBundle =
BundleUtils.getOptionalCompatibleBundle(extensionManager, service.getType(),
BundleUtils.createBundleDto(service.getBundle()));
- if (compatibleBundle.isPresent()) {
- final ConfigurableComponent tempComponent =
extensionManager.getTempComponent(service.getType(), compatibleBundle.get());
- resolveInheritedControllerServices(service,
availableControllerServiceIds, serviceNodes,
externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
- }
- }
-
- for (final VersionedProcessGroup child :
versionedGroup.getProcessGroups()) {
- resolveInheritedControllerServices(child, processGroupId,
externalControllerServiceReferences, user);
- }
- }
-
-
- private void resolveInheritedControllerServices(final
VersionedConfigurableComponent component, final Set<String>
availableControllerServiceIds,
- final
Set<ControllerServiceNode> availableControllerServices,
- final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences,
- final Function<String,
PropertyDescriptor> descriptorLookup) {
- final Map<String, VersionedPropertyDescriptor> descriptors =
component.getPropertyDescriptors();
- final Map<String, String> properties = component.getProperties();
-
- resolveInheritedControllerServices(descriptors, properties,
availableControllerServiceIds, availableControllerServices,
externalControllerServiceReferences, descriptorLookup);
- }
-
-
- private void resolveInheritedControllerServices(final Map<String,
VersionedPropertyDescriptor> propertyDescriptors, final Map<String, String>
componentProperties,
- final Set<String>
availableControllerServiceIds, final Set<ControllerServiceNode>
availableControllerServices,
- final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences,
- final Function<String,
PropertyDescriptor> descriptorLookup) {
-
- for (final Map.Entry<String, String> entry : new
HashMap<>(componentProperties).entrySet()) {
- final String propertyName = entry.getKey();
- final String propertyValue = entry.getValue();
-
- final VersionedPropertyDescriptor propertyDescriptor =
propertyDescriptors.get(propertyName);
- if (propertyDescriptor == null) {
- continue;
- }
-
- if (!propertyDescriptor.getIdentifiesControllerService()) {
- continue;
- }
-
- // If the referenced Controller Service is available in this flow,
there is nothing to resolve.
- if (availableControllerServiceIds.contains(propertyValue)) {
- continue;
- }
-
- final ExternalControllerServiceReference externalServiceReference
= externalControllerServiceReferences == null ? null :
externalControllerServiceReferences.get(propertyValue);
- if (externalServiceReference == null) {
- continue;
- }
-
- final PropertyDescriptor descriptor =
descriptorLookup.apply(propertyName);
- if (descriptor == null) {
- continue;
- }
-
- final Class<? extends ControllerService> referencedServiceClass =
descriptor.getControllerServiceDefinition();
- if (referencedServiceClass == null) {
- continue;
- }
-
- final String externalControllerServiceName =
externalServiceReference.getName();
- final List<ControllerServiceNode> matchingControllerServices =
availableControllerServices.stream()
- .filter(service ->
service.getName().equals(externalControllerServiceName))
- .filter(service ->
referencedServiceClass.isAssignableFrom(service.getProxiedControllerService().getClass()))
- .collect(Collectors.toList());
-
- if (matchingControllerServices.size() != 1) {
- continue;
- }
-
- final ControllerServiceNode matchingServiceNode =
matchingControllerServices.get(0);
- final Optional<String> versionedComponentId =
matchingServiceNode.getVersionedComponentId();
- final String resolvedId =
versionedComponentId.orElseGet(matchingServiceNode::getIdentifier);
-
- componentProperties.put(propertyName, resolvedId);
- }
- }
-
- private Set<String> findAllControllerServiceIds(final
VersionedProcessGroup group) {
- final Set<String> ids = new HashSet<>();
- findAllControllerServiceIds(group, ids);
- return ids;
- }
-
- private void findAllControllerServiceIds(final VersionedProcessGroup
group, final Set<String> ids) {
- for (final VersionedControllerService service :
group.getControllerServices()) {
- ids.add(service.getIdentifier());
- }
-
- for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
- findAllControllerServiceIds(childGroup, ids);
- }
+ public void resolveInheritedControllerServices(final FlowSnapshotContainer
flowSnapshotContainer, final String processGroupId, final NiFiUser user) {
+
controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer,
processGroupId, user);
}
@Override
@@ -5208,7 +5089,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public RegisteredFlowSnapshot getVersionedFlowSnapshotByGroupId(final
String processGroupId) {
+ public FlowSnapshotContainer getVersionedFlowSnapshotByGroupId(final
String processGroupId) {
final ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo =
processGroup.getVersionControlInformation();
@@ -5217,7 +5098,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public RegisteredFlowSnapshot getVersionedFlowSnapshot(final
VersionControlInformationDTO versionControlInfo, final boolean
fetchRemoteFlows) {
+ public FlowSnapshotContainer getVersionedFlowSnapshot(final
VersionControlInformationDTO versionControlInfo, final boolean
fetchRemoteFlows) {
return getVersionedFlowSnapshot(versionControlInfo.getRegistryId(),
versionControlInfo.getBucketId(), versionControlInfo.getFlowId(),
versionControlInfo.getVersion(), fetchRemoteFlows);
}
@@ -5231,16 +5112,15 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
* @param fetchRemoteFlows indicator to include remote flows when
retrieving the flow
* @return a VersionedFlowSnapshot from a registry with the given version
*/
- private RegisteredFlowSnapshot getVersionedFlowSnapshot(final String
registryId, final String bucketId, final String flowId,
+ private FlowSnapshotContainer getVersionedFlowSnapshot(final String
registryId, final String bucketId, final String flowId,
final Integer
flowVersion, final boolean fetchRemoteFlows) {
final FlowRegistryClientNode flowRegistry =
flowRegistryDAO.getFlowRegistryClient(registryId);
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow
Registry registered with identifier " + registryId);
}
- final RegisteredFlowSnapshot snapshot;
try {
- snapshot =
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
bucketId, flowId, flowVersion, fetchRemoteFlows);
+ return
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
bucketId, flowId, flowVersion, fetchRemoteFlows);
} catch (final FlowRegistryException e) {
logger.error(e.getMessage(), e);
throw new IllegalArgumentException("The Flow Registry with ID " +
registryId + " reports that no Flow exists with Bucket "
@@ -5248,8 +5128,6 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
} catch (final IOException ioe) {
throw new IllegalStateException("Failed to communicate with Flow
Registry when attempting to retrieve a versioned flow", ioe);
}
-
- return snapshot;
}
@Override
@@ -5330,8 +5208,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
if (registryGroup == null) {
try {
- final RegisteredFlowSnapshot versionedFlowSnapshot =
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
+ final FlowSnapshotContainer flowSnapshotContainer =
flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
versionControlInfo.getBucketIdentifier(),
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true);
+ final RegisteredFlowSnapshot versionedFlowSnapshot =
flowSnapshotContainer.getFlowSnapshot();
registryGroup = versionedFlowSnapshot.getFlowContents();
} catch (final IOException | FlowRegistryException e) {
throw new NiFiCoreException("Failed to retrieve flow with Flow
Registry in order to calculate local differences due to " + e.getMessage(), e);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 4eb86091e4..6249872da4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -31,6 +31,7 @@ import
org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
@@ -135,12 +136,12 @@ public abstract class FlowUpdateResource<T extends
ProcessGroupDescriptorEntity,
* @param allowDirtyFlowUpdate allow updating a flow with
versioned changes present
* @param requestType the type of request
("replace-requests" or "update-requests")
* @param replicateUriPath the uri path to use for replicating
the request (differs from initial request uri)
- * @param flowSnapshotSupplier provides access to the flow
snapshot to be used for replacement
+ * @param flowSnapshotContainerSupplier provides access to the
flow snapshot to be used for replacement
* @return response containing status of the async request
*/
protected Response initiateFlowUpdate(final String groupId, final T
requestEntity, final boolean allowDirtyFlowUpdate,
final String requestType, final
String replicateUriPath,
- final
Supplier<RegisteredFlowSnapshot> flowSnapshotSupplier) {
+ final
Supplier<FlowSnapshotContainer> flowSnapshotContainerSupplier) {
// Verify the request
final RevisionDTO revisionDto =
requestEntity.getProcessGroupRevision();
if (revisionDto == null) {
@@ -185,14 +186,15 @@ public abstract class FlowUpdateResource<T extends
ProcessGroupDescriptorEntity,
// 13. Re-Start all Processors, Funnels, Ports that are affected and
not removed.
// Step 0: Obtain the versioned flow snapshot to use for the update
- final RegisteredFlowSnapshot flowSnapshot = flowSnapshotSupplier.get();
+ final FlowSnapshotContainer flowSnapshotContainer =
flowSnapshotContainerSupplier.get();
+ final RegisteredFlowSnapshot flowSnapshot =
flowSnapshotContainer.getFlowSnapshot();
// The new flow may not contain the same versions of components in
existing flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited
from the parent group, resolve those to point to the appropriate Controller
Service, if we are able to.
- serviceFacade.resolveInheritedControllerServices(flowSnapshot,
groupId, user);
+
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer,
groupId, user);
// If there are any Parameter Providers referenced by Parameter
Contexts, resolve these to point to the appropriate Parameter Provider, if we
are able to.
serviceFacade.resolveParameterProviders(flowSnapshot, user);
@@ -379,7 +381,14 @@ public abstract class FlowUpdateResource<T extends
ProcessGroupDescriptorEntity,
// Get the Original Flow Snapshot in case we fail to update and need
to rollback
// This only applies to flows that were under version control, update
may be called without version control
final VersionControlInformationEntity vciEntity =
serviceFacade.getVersionControlInformation(groupId);
- final RegisteredFlowSnapshot originalFlowSnapshot = vciEntity == null
? null :
serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(),
true);
+
+ final RegisteredFlowSnapshot originalFlowSnapshot;
+ if (vciEntity == null) {
+ originalFlowSnapshot = null;
+ } else {
+ final FlowSnapshotContainer originalFlowSnapshotContainer =
serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(),
true);
+ originalFlowSnapshot =
originalFlowSnapshotContainer.getFlowSnapshot();
+ }
try {
if (replicateRequest) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 126bc41ca3..33744e25d7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -58,6 +58,7 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
@@ -2012,7 +2013,8 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
if (versionControlInfo != null &&
requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process
Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
- final RegisteredFlowSnapshot flowSnapshot =
getFlowFromRegistry(versionControlInfo);
+ final FlowSnapshotContainer flowSnapshotContainer =
getFlowFromRegistry(versionControlInfo);
+ final RegisteredFlowSnapshot flowSnapshot =
flowSnapshotContainer.getFlowSnapshot();
// Step 3: Enrich version control info came from UI
if (flowSnapshot.getFlowContents() != null) {
@@ -2026,7 +2028,7 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are
inherited from the parent group, resolve those to point to the appropriate
Controller Service, if we are able to.
- serviceFacade.resolveInheritedControllerServices(flowSnapshot,
groupId, NiFiUserUtils.getNiFiUser());
+
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer,
groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter
Contexts, resolve these to point to the appropriate Parameter Provider, if we
are able to.
serviceFacade.resolveParameterProviders(flowSnapshot,
NiFiUserUtils.getNiFiUser());
@@ -2095,8 +2097,9 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
);
}
- private RegisteredFlowSnapshot getFlowFromRegistry(final
VersionControlInformationDTO versionControlInfo) {
- final RegisteredFlowSnapshot flowSnapshot =
serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
+ private FlowSnapshotContainer getFlowFromRegistry(final
VersionControlInformationDTO versionControlInfo) {
+ final FlowSnapshotContainer flowSnapshotContainer =
serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
+ final RegisteredFlowSnapshot flowSnapshot =
flowSnapshotContainer.getFlowSnapshot();
final FlowRegistryBucket bucket = flowSnapshot.getBucket();
final RegisteredFlow flow = flowSnapshot.getFlow();
@@ -2108,7 +2111,7 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
final VersionedFlowState flowState = flowSnapshot.isLatest() ?
VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
versionControlInfo.setState(flowState.name());
- return flowSnapshot;
+ return flowSnapshotContainer;
}
@@ -4170,8 +4173,9 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
versionedFlowSnapshot.setSnapshotMetadata(null);
sanitizeRegistryInfo(versionedFlowSnapshot.getFlowContents());
+ final FlowSnapshotContainer flowSnapshotContainer = new
FlowSnapshotContainer(versionedFlowSnapshot);
return initiateFlowUpdate(groupId, importEntity, true,
"replace-requests",
- "/nifi-api/process-groups/" + groupId + "/flow-contents",
importEntity::getVersionedFlowSnapshot);
+ "/nifi-api/process-groups/" + groupId + "/flow-contents", ()
-> flowSnapshotContainer);
}
/**
@@ -4286,7 +4290,8 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
// if there are any Controller Services referenced that are inherited
from the parent group,
// resolve those to point to the appropriate Controller Service, if we
are able to.
- serviceFacade.resolveInheritedControllerServices(deserializedSnapshot,
groupId, NiFiUserUtils.getNiFiUser());
+ final FlowSnapshotContainer flowSnapshotContainer = new
FlowSnapshotContainer(deserializedSnapshot);
+
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer,
groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter
Contexts, resolve these to point to the appropriate Parameter Provider, if we
are able to.
serviceFacade.resolveParameterProviders(deserializedSnapshot,
NiFiUserUtils.getNiFiUser());
@@ -4386,7 +4391,8 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
// if there are any Controller Services referenced that are inherited
from the parent group,
// resolve those to point to the appropriate Controller Service, if we
are able to.
-
serviceFacade.resolveInheritedControllerServices(versionedFlowSnapshot,
groupId, NiFiUserUtils.getNiFiUser());
+ final FlowSnapshotContainer flowSnapshotContainer = new
FlowSnapshotContainer(versionedFlowSnapshot);
+
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer,
groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter
Contexts, resolve these to point to the appropriate Parameter Provider, if we
are able to.
serviceFacade.resolveParameterProviders(versionedFlowSnapshot,
NiFiUserUtils.getNiFiUser());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index eab88cc365..2d4e8278f2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -33,6 +33,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@@ -164,7 +165,8 @@ public class VersionsResource extends
FlowUpdateResource<VersionControlInformati
});
// get the versioned flow
- final RegisteredFlowSnapshot versionedFlowSnapshot =
serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
+ final FlowSnapshotContainer snapshotContainer =
serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
+ final RegisteredFlowSnapshot versionedFlowSnapshot =
snapshotContainer.getFlowSnapshot();
final VersionedProcessGroup versionedProcessGroup =
versionedFlowSnapshot.getFlowContents();
final String flowName = versionedProcessGroup.getName();
@@ -1129,14 +1131,15 @@ public class VersionsResource extends
FlowUpdateResource<VersionControlInformati
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
- final RegisteredFlowSnapshot flowSnapshot =
serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(),
true);
+ final FlowSnapshotContainer flowSnapshotContainer =
serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(),
true);
+ final RegisteredFlowSnapshot flowSnapshot =
flowSnapshotContainer.getFlowSnapshot();
// The flow in the registry may not contain the same versions of
components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited
from the parent group, resolve those to point to the appropriate Controller
Service, if we are able to.
- serviceFacade.resolveInheritedControllerServices(flowSnapshot,
groupId, NiFiUserUtils.getNiFiUser());
+
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer,
groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter
Contexts, resolve these to point to the appropriate Parameter Provider, if we
are able to.
serviceFacade.resolveParameterProviders(flowSnapshot,
NiFiUserUtils.getNiFiUser());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index c97419a67a..9a5fbe7f2e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -54,6 +54,7 @@ import
org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceResolver;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -190,6 +191,10 @@ public class ControllerFacade implements Authorizable {
return flowController.getControllerServiceProvider();
}
+ public ControllerServiceResolver getControllerServiceResolver() {
+ return flowController.getControllerServiceResolver();
+ }
+
public ExtensionManager getExtensionManager() {
return flowController.getExtensionManager();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
index 10c76ba751..0f4c1999c5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
@@ -17,6 +17,7 @@
package org.apache.nifi.web.api;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
import org.apache.nifi.web.NiFiServiceFacade;
@@ -48,8 +49,8 @@ public class TestVersionsResource {
public void testExportFlowVersion() {
final String groupId = UUID.randomUUID().toString();
final RegisteredFlowSnapshot versionedFlowSnapshot =
mock(RegisteredFlowSnapshot.class);
-
-
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(versionedFlowSnapshot);
+
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(snapshotContainer);
final String flowName = "flowname";
final int flowVersion = 1;