This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 0588830 NIFI-6872: - Added UI versioned flow supportsDownload
functionality with download flow menu item - Added VersionsResource endpoint
for downloading versioned flow with registry-related info removed - Added
ProcessGroupResource endpoint for downloading current flow with
registry-related info removed - Added StandardNifiServiceFacade functionality
for downloading both current and versioned flow - Added XmlTransient markers on
variables introduced by Instantiated model class [...]
0588830 is described below
commit 058883091cf2cae0ff344136d6d58dd8efe0fd6c
Author: Joe Ferner <[email protected]>
AuthorDate: Tue Nov 19 10:15:42 2019 -0500
NIFI-6872:
- Added UI versioned flow supportsDownload functionality with download flow
menu item
- Added VersionsResource endpoint for downloading versioned flow with
registry-related info removed
- Added ProcessGroupResource endpoint for downloading current flow with
registry-related info removed
- Added StandardNifiServiceFacade functionality for downloading both
current and versioned flow
- Added XmlTransient markers on variables introduced by Instantiated model
classes so they do not appear in serialized download
- Updated NiFiRegistryFlowMapper.mapParameterContexts to handle mapping
nested parameter contexts for use in producing a complete VersionedFlowSnapshot
- Added ability for NiFiRegistryFlowMapper to map nested process groups
ignoring versioning for use in producing a complete VersionedFlowSnapshot
- Added unit tests where helpful
NIFI-6872: PR response...
- Updated mapParameterContext to return a Map to handle uniqueness of
contexts by name since ultimately everything converted it to a map anyway. The
VersionedParameterContext class from the registry model doesn't support
hashcode/equals currently so returning a Set wouldn't work.
- Updated assert calls to put expected value as first parameter and actual
as second parameter
- Added one time password (OTP) support for flow download endpoint to
support non cert based authentication
This closes #3931
---
.../nifi/web/api/entity/VersionedFlowEntity.java | 4 +-
.../apache/nifi/registry/flow/FlowRegistry.java | 5 +-
.../nifi/registry/flow/RestBasedFlowRegistry.java | 14 +-
.../mapping/InstantiatedVersionedComponent.java | 6 +
.../mapping/InstantiatedVersionedProcessGroup.java | 4 +
.../flow/mapping/NiFiRegistryFlowMapper.java | 267 ++++---
.../integration/MockSingleFlowRegistryClient.java | 3 +-
.../flow/mapping/NiFiRegistryFlowMapperTest.java | 780 +++++++++++++++++++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 26 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 134 ++--
.../apache/nifi/web/api/ApplicationResource.java | 22 +-
.../apache/nifi/web/api/ProcessGroupResource.java | 45 ++
.../org/apache/nifi/web/api/VersionsResource.java | 61 ++
.../nifi/web/StandardNiFiServiceFacadeTest.java | 78 ++-
.../nifi/web/api/TestProcessGroupResource.java | 62 ++
.../apache/nifi/web/api/TestVersionsResource.java | 81 +++
.../web/security/otp/OtpAuthenticationFilter.java | 5 +-
.../src/main/webapp/js/nf/canvas/nf-actions.js | 42 +-
.../main/webapp/js/nf/canvas/nf-context-menu.js | 177 ++---
19 files changed, 1506 insertions(+), 310 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java
index 26772ad..c6c19f3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java
@@ -31,7 +31,7 @@ public class VersionedFlowEntity extends Entity {
return versionedFlow;
}
- public void setVersionedFlow(VersionedFlowDTO versionedFLow) {
- this.versionedFlow = versionedFLow;
+ public void setVersionedFlow(VersionedFlowDTO versionedFlow) {
+ this.versionedFlow = versionedFlow;
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 92fc8a1..f1d0995 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -23,7 +23,6 @@ import org.apache.nifi.registry.client.NiFiRegistryException;
import java.io.IOException;
import java.util.Map;
-import java.util.Collection;
import java.util.Set;
public interface FlowRegistry {
@@ -134,7 +133,7 @@ public interface FlowRegistry {
* @param snapshot the snapshot of the flow
* @param externalControllerServices a mapping of of Controller Service
identifier to ExternalControllerServiceReference for any Controller Service
that is referenced by the flow but that are
* not included as part of the VersionedProcessGroup
- * @param parameterContexts the Parameter Contexts to include in the
snapshot
+ * @param parameterContexts a map of the Parameter Contexts to include in
the snapshot keyed by name
* @param comments any comments for the snapshot
* @param expectedVersion the version of the flow that we expect to save
this snapshot as
* @return the versioned flow snapshot
@@ -144,7 +143,7 @@ public interface FlowRegistry {
* @throws NiFiRegistryException if the flow does not exist
*/
VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow,
VersionedProcessGroup snapshot, Map<String, ExternalControllerServiceReference>
externalControllerServices,
-
Collection<VersionedParameterContext> parameterContexts, String comments,
+ Map<String,
VersionedParameterContext> parameterContexts, String comments,
int expectedVersion,
NiFiUser user) throws IOException, NiFiRegistryException;
/**
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index a7f87d2..35692e6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -29,14 +29,12 @@ import
org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
import javax.net.ssl.SSLContext;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
public class RestBasedFlowRegistry implements FlowRegistry {
- private static final String FLOW_ENCODING_VERSION = "1.0";
+ public static final String FLOW_ENCODING_VERSION = "1.0";
private final FlowRegistryClient flowRegistryClient;
private final String identifier;
@@ -179,17 +177,13 @@ public class RestBasedFlowRegistry implements
FlowRegistry {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final
VersionedFlow flow, final VersionedProcessGroup snapshot,
final
Map<String, ExternalControllerServiceReference> externalControllerServices,
-
Collection<VersionedParameterContext> parameterContexts, final String comments,
final int expectedVersion,
- final NiFiUser
user) throws IOException, NiFiRegistryException {
-
- final Map<String, VersionedParameterContext> parameterContextMap =
parameterContexts.stream()
- .collect(Collectors.toMap(VersionedParameterContext::getName,
context -> context));
-
+ final
Map<String, VersionedParameterContext> parameterContexts, final String comments,
+ final int
expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException
{
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot versionedFlowSnapshot = new
VersionedFlowSnapshot();
versionedFlowSnapshot.setFlowContents(snapshot);
versionedFlowSnapshot.setExternalControllerServices(externalControllerServices);
- versionedFlowSnapshot.setParameterContexts(parameterContextMap);
+ versionedFlowSnapshot.setParameterContexts(parameterContexts);
versionedFlowSnapshot.setFlowEncodingVersion(FLOW_ENCODING_VERSION);
final VersionedFlowSnapshotMetadata metadata = new
VersionedFlowSnapshotMetadata();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java
index 15c620a..e655c16 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java
@@ -17,8 +17,14 @@
package org.apache.nifi.registry.flow.mapping;
+import javax.xml.bind.annotation.XmlTransient;
+
public interface InstantiatedVersionedComponent {
+
+ // mark transient so fields are ignored when serializing all versioned
component types
+ @XmlTransient
String getInstanceId();
+ @XmlTransient
String getInstanceGroupId();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
index 3eed715..967eec3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
@@ -20,9 +20,11 @@ package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import javax.xml.bind.annotation.XmlTransient;
import java.util.Map;
public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup
implements InstantiatedVersionedComponent {
+
private final String instanceId;
private final String groupId;
private Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences;
@@ -46,6 +48,8 @@ public class InstantiatedVersionedProcessGroup extends
VersionedProcessGroup imp
this.externalControllerServiceReferences =
externalControllerServiceReferences;
}
+ // mark transient so field is ignored when serializing this class
+ @XmlTransient
public Map<String, ExternalControllerServiceReference>
getExternalControllerServiceReferences() {
return externalControllerServiceReferences;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index 67be279..2488adf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -27,8 +27,8 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -80,6 +80,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -90,7 +91,7 @@ public class NiFiRegistryFlowMapper {
// We need to keep a mapping of component id to versionedComponentId as we
transform these objects. This way, when
// we call #mapConnectable, instead of generating a new UUID for the
ConnectableComponent, we can lookup the 'versioned'
- // identifier based on the comopnent's actual id. We do connections last,
so that all components will already have been
+ // identifier based on the component's actual id. We do connections last,
so that all components will already have been
// created before attempting to create the connection, where the
ConnectableDTO is converted.
private Map<String, String> versionedComponentIds = new HashMap<>();
@@ -98,67 +99,89 @@ public class NiFiRegistryFlowMapper {
this.extensionManager = extensionManager;
}
- public InstantiatedVersionedProcessGroup mapProcessGroup(final
ProcessGroup group, final ControllerServiceProvider serviceProvider, final
FlowRegistryClient registryClient,
- final boolean
mapDescendantVersionedFlows) {
+ /**
+ * Map the given process group to a versioned process group without any
use of an actual flow registry even if the
+ * group is currently versioned in a registry.
+ *
+ * @param group the process group to map
+ * @param serviceProvider the controller service provider to use for
mapping
+ * @return a complete versioned process group without any registry related
details
+ */
+ public InstantiatedVersionedProcessGroup mapNonVersionedProcessGroup(final
ProcessGroup group, final ControllerServiceProvider serviceProvider) {
versionedComponentIds.clear();
- final InstantiatedVersionedProcessGroup mapped = mapGroup(group,
serviceProvider, registryClient, true, mapDescendantVersionedFlows);
- populateReferencedAncestorVariables(group, mapped);
-
- return mapped;
+ // always include descendant flows and do not apply any registry
versioning info that may be present in the group
+ return mapGroup(group, serviceProvider, (processGroup, versionedGroup)
-> true);
}
- private void populateReferencedAncestorVariables(final ProcessGroup group,
final VersionedProcessGroup versionedGroup) {
- final Set<String> ancestorVariableNames = new HashSet<>();
- populateVariableNames(group.getParent(), ancestorVariableNames);
-
- final Map<String, String> implicitlyDefinedVariables = new HashMap<>();
- for (final String variableName : ancestorVariableNames) {
- final boolean isReferenced =
!group.getComponentsAffectedByVariable(variableName).isEmpty();
- if (isReferenced) {
- final String value =
group.getVariableRegistry().getVariableValue(variableName);
- implicitlyDefinedVariables.put(variableName, value);
- }
- }
-
- if (!implicitlyDefinedVariables.isEmpty()) {
- // Merge the implicit variables with the explicitly defined
variables for the Process Group
- // and set those as the Versioned Group's variables.
- if (versionedGroup.getVariables() != null) {
-
implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
- }
+ /**
+ * Map the given process group to a versioned process group using the
provided registry client.
+ *
+ * @param group the process group to map
+ * @param serviceProvider the controller service provider to use for
mapping
+ * @param registryClient the registry client to use when retrieving
versioning details
+ * @param mapDescendantVersionedFlows true in order to include descendant
flows in the mapped result
+ * @return a complete versioned process group with applicable registry
related details
+ */
+ public InstantiatedVersionedProcessGroup mapProcessGroup(final
ProcessGroup group, final ControllerServiceProvider serviceProvider,
+ final
FlowRegistryClient registryClient,
+ final boolean
mapDescendantVersionedFlows) {
+ versionedComponentIds.clear();
- versionedGroup.setVariables(implicitlyDefinedVariables);
- }
- }
+ // apply registry versioning according to the lambda below
+ // NOTE: lambda refers to registry client and map descendant boolean
which will not change during recursion
+ return mapGroup(group, serviceProvider, (processGroup, versionedGroup)
-> {
+ final VersionControlInformation versionControlInfo =
processGroup.getVersionControlInformation();
+ if (versionControlInfo != null) {
+ final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
+ final String registryId =
versionControlInfo.getRegistryIdentifier();
+ final FlowRegistry registry =
registryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new IllegalStateException("Process Group refers to a
Flow Registry with ID " + registryId + " but no Flow Registry exists with that
ID. Cannot resolve to a URL.");
+ }
- private void populateVariableNames(final ProcessGroup group, final
Set<String> variableNames) {
- if (group == null) {
- return;
- }
+ coordinates.setRegistryUrl(registry.getURL());
+
coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
+ coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
+ coordinates.setVersion(versionControlInfo.getVersion());
+ versionedGroup.setVersionedFlowCoordinates(coordinates);
- group.getVariableRegistry().getVariableMap().keySet().stream()
- .map(VariableDescriptor::getName)
- .forEach(variableNames::add);
+ // We need to register the Port ID -> Versioned Component ID's
in our versionedComponentIds member variable for all input & output ports.
+ // Otherwise, we will not be able to lookup the port when
connecting to it.
+ for (final Port port : processGroup.getInputPorts()) {
+ getId(port.getVersionedComponentId(),
port.getIdentifier());
+ }
+ for (final Port port : processGroup.getOutputPorts()) {
+ getId(port.getVersionedComponentId(),
port.getIdentifier());
+ }
- populateVariableNames(group.getParent(), variableNames);
+ // If the Process Group itself is remotely versioned, then we
don't want to include its contents
+ // because the contents are remotely managed and not part of
the versioning of this Process Group
+ return mapDescendantVersionedFlows;
+ }
+ return true;
+ });
}
- private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup
group, final ControllerServiceProvider serviceProvider, final
FlowRegistryClient registryClient,
- final boolean topLevel,
final boolean mapDescendantVersionedFlows) {
-
+ private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup
group, final ControllerServiceProvider serviceProvider,
+ final
BiFunction<ProcessGroup, VersionedProcessGroup, Boolean>
applyVersionControlInfo) {
final Set<String> allIncludedGroupsIds =
group.findAllProcessGroups().stream()
- .map(ProcessGroup::getIdentifier)
- .collect(Collectors.toSet());
+ .map(ProcessGroup::getIdentifier)
+ .collect(Collectors.toSet());
allIncludedGroupsIds.add(group.getIdentifier());
final Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences = new HashMap<>();
- return mapGroup(group, serviceProvider, registryClient, topLevel,
mapDescendantVersionedFlows, allIncludedGroupsIds,
externalControllerServiceReferences);
- }
+ final InstantiatedVersionedProcessGroup versionedGroup =
+ mapGroup(group, serviceProvider, applyVersionControlInfo,
true, allIncludedGroupsIds, externalControllerServiceReferences);
+ populateReferencedAncestorVariables(group, versionedGroup);
- private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup
group, final ControllerServiceProvider serviceProvider, final
FlowRegistryClient registryClient,
- final boolean topLevel,
final boolean mapDescendantVersionedFlows, final Set<String> includedGroupIds,
+ return versionedGroup;
+ }
+
+ private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup
group, final ControllerServiceProvider serviceProvider,
+ final
BiFunction<ProcessGroup, VersionedProcessGroup, Boolean>
applyVersionControlInfo,
+ final boolean topLevel,
final Set<String> includedGroupIds,
final Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences) {
final InstantiatedVersionedProcessGroup versionedGroup = new
InstantiatedVersionedProcessGroup(group.getIdentifier(),
group.getProcessGroupIdentifier());
@@ -175,76 +198,52 @@ public class NiFiRegistryFlowMapper {
// then we don't want to include the RemoteFlowCoordinates; we want to
include the group contents. The RemoteFlowCoordinates will be used
// only for a child group that is itself version controlled.
if (!topLevel) {
- final VersionControlInformation versionControlInfo =
group.getVersionControlInformation();
- if (versionControlInfo != null) {
- final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
- final String registryId =
versionControlInfo.getRegistryIdentifier();
- final FlowRegistry registry =
registryClient.getFlowRegistry(registryId);
- if (registry == null) {
- throw new IllegalStateException("Process Group refers to a
Flow Registry with ID " + registryId + " but no Flow Registry exists with that
ID. Cannot resolve to a URL.");
- }
-
- coordinates.setRegistryUrl(registry.getURL());
-
coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
- coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
- coordinates.setVersion(versionControlInfo.getVersion());
- versionedGroup.setVersionedFlowCoordinates(coordinates);
-
- // We need to register the Port ID -> Versioned Component ID's
in our versionedComponentIds member variable for all input & output ports.
- // Otherwise, we will not be able to lookup the port when
connecting to it.
- for (final Port port : group.getInputPorts()) {
- getId(port.getVersionedComponentId(),
port.getIdentifier());
- }
- for (final Port port : group.getOutputPorts()) {
- getId(port.getVersionedComponentId(),
port.getIdentifier());
- }
+ final boolean mapDescendantVersionedFlows =
applyVersionControlInfo.apply(group, versionedGroup);
- // If the Process Group itself is remotely versioned, then we
don't want to include its contents
- // because the contents are remotely managed and not part of
the versioning of this Process Group
- if (!mapDescendantVersionedFlows) {
- return versionedGroup;
- }
+ // return here if we do not want to include remotely managed
descendant flows
+ if (!mapDescendantVersionedFlows) {
+ return versionedGroup;
}
}
versionedGroup.setControllerServices(group.getControllerServices(false).stream()
- .map(service -> mapControllerService(service, serviceProvider,
includedGroupIds, externalControllerServiceReferences))
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(service -> mapControllerService(service, serviceProvider,
includedGroupIds, externalControllerServiceReferences))
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setFunnels(group.getFunnels().stream()
- .map(this::mapFunnel)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapFunnel)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setInputPorts(group.getInputPorts().stream()
- .map(this::mapPort)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapPort)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setOutputPorts(group.getOutputPorts().stream()
- .map(this::mapPort)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapPort)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setLabels(group.getLabels().stream()
- .map(this::mapLabel)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapLabel)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessors(group.getProcessors().stream()
- .map(processor -> mapProcessor(processor, serviceProvider,
includedGroupIds, externalControllerServiceReferences))
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(processor -> mapProcessor(processor, serviceProvider,
includedGroupIds, externalControllerServiceReferences))
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream()
- .map(this::mapRemoteProcessGroup)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapRemoteProcessGroup)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(group.getProcessGroups().stream()
- .map(grp -> mapGroup(grp, serviceProvider, registryClient, false,
mapDescendantVersionedFlows, includedGroupIds,
externalControllerServiceReferences))
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(grp -> mapGroup(grp, serviceProvider,
applyVersionControlInfo, false, includedGroupIds,
externalControllerServiceReferences))
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(group.getConnections().stream()
- .map(this::mapConnection)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
+ .map(this::mapConnection)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey().getName(),
Map.Entry::getValue)));
+ .collect(Collectors.toMap(entry -> entry.getKey().getName(),
Map.Entry::getValue)));
if (topLevel) {
versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences);
@@ -253,6 +252,42 @@ public class NiFiRegistryFlowMapper {
return versionedGroup;
}
+ private void populateReferencedAncestorVariables(final ProcessGroup group,
final VersionedProcessGroup versionedGroup) {
+ final Set<String> ancestorVariableNames = new HashSet<>();
+ populateVariableNames(group.getParent(), ancestorVariableNames);
+
+ final Map<String, String> implicitlyDefinedVariables = new HashMap<>();
+ for (final String variableName : ancestorVariableNames) {
+ final boolean isReferenced =
!group.getComponentsAffectedByVariable(variableName).isEmpty();
+ if (isReferenced) {
+ final String value =
group.getVariableRegistry().getVariableValue(variableName);
+ implicitlyDefinedVariables.put(variableName, value);
+ }
+ }
+
+ if (!implicitlyDefinedVariables.isEmpty()) {
+ // Merge the implicit variables with the explicitly defined
variables for the Process Group
+ // and set those as the Versioned Group's variables.
+ if (versionedGroup.getVariables() != null) {
+
implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+ }
+
+ versionedGroup.setVariables(implicitlyDefinedVariables);
+ }
+ }
+
+ private void populateVariableNames(final ProcessGroup group, final
Set<String> variableNames) {
+ if (group == null) {
+ return;
+ }
+
+ group.getVariableRegistry().getVariableMap().keySet().stream()
+ .map(VariableDescriptor::getName)
+ .forEach(variableNames::add);
+
+ populateVariableNames(group.getParent(), variableNames);
+ }
+
private String getId(final Optional<String> currentVersionedId, final
String componentId) {
final String versionedId;
if (currentVersionedId.isPresent()) {
@@ -279,7 +314,7 @@ public class NiFiRegistryFlowMapper {
}
- private String getGroupId(final String groupId) {
+ public String getGroupId(final String groupId) {
return versionedComponentIds.get(groupId);
}
@@ -599,23 +634,39 @@ public class NiFiRegistryFlowMapper {
return batchSize;
}
- public VersionedParameterContext mapParameterContext(final
ParameterContext context) {
- if (context == null) {
- return null;
- }
-
- final Set<VersionedParameter> parameters =
context.getParameters().values().stream()
- .map(this::mapParameter)
- .collect(Collectors.toSet());
+ public Map<String, VersionedParameterContext> mapParameterContexts(final
ProcessGroup processGroup,
+ final
boolean mapDescendantVersionedFlows) {
+ // cannot use a set to enforce uniqueness of parameter contexts
because VersionedParameterContext in the
+ // registry data model doesn't currently implement hashcode/equals
based on context name
+ final Map<String, VersionedParameterContext> parameterContexts = new
HashMap<>();
+ mapParameterContexts(processGroup, mapDescendantVersionedFlows,
parameterContexts);
+ return parameterContexts;
+ }
- final VersionedParameterContext versionedContext = new
VersionedParameterContext();
- versionedContext.setName(context.getName());
- versionedContext.setParameters(parameters);
+ private void mapParameterContexts(final ProcessGroup processGroup, final
boolean mapDescendantVersionedFlows,
+ final Map<String,
VersionedParameterContext> parameterContexts) {
+ final ParameterContext parameterContext =
processGroup.getParameterContext();
+ if (parameterContext != null) {
+ // map this process group's parameter context and add to the
collection
+ final Set<VersionedParameter> parameters =
parameterContext.getParameters().values().stream()
+ .map(this::mapParameter)
+ .collect(Collectors.toSet());
+
+ final VersionedParameterContext versionedContext = new
VersionedParameterContext();
+ versionedContext.setName(parameterContext.getName());
+ versionedContext.setParameters(parameters);
+ parameterContexts.put(versionedContext.getName(),
versionedContext);
+ }
- return versionedContext;
+ for (final ProcessGroup child : processGroup.getProcessGroups()) {
+ // only include child process group parameter contexts if boolean
indicator is true or process group is unversioned
+ if (mapDescendantVersionedFlows ||
child.getVersionControlInformation() == null) {
+ mapParameterContexts(child, mapDescendantVersionedFlows,
parameterContexts);
+ }
+ }
}
- public VersionedParameter mapParameter(final Parameter parameter) {
+ private VersionedParameter mapParameter(final Parameter parameter) {
if (parameter == null) {
return null;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
index 0a6c932..8d32329 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
@@ -29,7 +29,6 @@ import
org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -170,7 +169,7 @@ public class MockSingleFlowRegistryClient implements
FlowRegistryClient {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final
VersionedFlow flow, final VersionedProcessGroup snapshot,
final
Map<String, ExternalControllerServiceReference> externalControllerServices,
- final
Collection<VersionedParameterContext> parameterContexts, final String comments,
+ final
Map<String, VersionedParameterContext> parameterContexts, final String comments,
final int
expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException
{
return null;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
new file mode 100644
index 0000000..3a67811
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
@@ -0,0 +1,780 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Positionable;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.flow.ComponentType;
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.PortType;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedControllerService;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFunnel;
+import org.apache.nifi.registry.flow.VersionedLabel;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NiFiRegistryFlowMapperTest {
+
+ @Mock
+ private ExtensionManager extensionManager;
+ @Mock
+ private ControllerServiceProvider controllerServiceProvider;
+ @Mock
+ private FlowRegistryClient flowRegistryClient;
+
+ private NiFiRegistryFlowMapper flowMapper = new
NiFiRegistryFlowMapper(extensionManager);
+
+ private int counter = 1;
+
+ @Before
+ public void setup() {
+ final FlowRegistry flowRegistry = mock(FlowRegistry.class);
+
when(flowRegistryClient.getFlowRegistry(anyString())).thenReturn(flowRegistry);
+ when(flowRegistry.getURL()).thenReturn("url");
+ }
+
+ /**
+ * Test mapping versioned process group's parameter contexts excluding
descendant versioned process groups
+ */
+ @Test
+ public void testMapParameterContextsExcludingVersionedDescendants() {
+ final ProcessGroup innerInnerProcessGroup =
+
prepareProcessGroupWithParameterContext(Collections.emptyList(),
+ true, true);
+ final ProcessGroup innerProcessGroup =
+
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
+ true, false);
+ final ProcessGroup processGroup =
+
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
+ false, false);
+
+ // first nesting should be traversed because child is not version
controlled, but deeper nesting should be ignored
+ // because map versioned descendants indicator is false
+ final Map<String, VersionedParameterContext>
versionedParameterContexts =
+ flowMapper.mapParameterContexts(processGroup, false);
+
+ // verify single parameter context
+ assertEquals(1, versionedParameterContexts.size());
+
+ final String expectedName =
innerProcessGroup.getParameterContext().getName();
+ verifyParameterContext(innerProcessGroup.getParameterContext(),
versionedParameterContexts.get(expectedName));
+ }
+
+ /**
+ * Test mapping nested process group's parameter contexts
+ */
+ @Test
+ public void testMapNestedParameterContexts() {
+ final ProcessGroup innerInnerProcessGroup =
+
prepareProcessGroupWithParameterContext(Collections.emptyList(),
+ true, true);
+ final ProcessGroup innerProcessGroup =
+
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
+ false, true);
+ final ProcessGroup processGroup =
+
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
+ true, true);
+
+ // include nested parameter contexts even though they are version
controlled because map descendant indicator is true
+ final Map<String, VersionedParameterContext>
versionedParameterContexts =
+ flowMapper.mapParameterContexts(processGroup, true);
+
+ // verify parameter contexts
+ assertEquals(2, versionedParameterContexts.size());
+
+ final String expectedName1 =
processGroup.getParameterContext().getName();
+ final String expectedName2 =
innerInnerProcessGroup.getParameterContext().getName();
+ verifyParameterContext(processGroup.getParameterContext(),
versionedParameterContexts.get(expectedName1));
+ verifyParameterContext(innerInnerProcessGroup.getParameterContext(),
versionedParameterContexts.get(expectedName2));
+ }
+
+ /**
+ * Test mapping a versioned ProcessGroup model to a versioned
VersionedProcessGroup excluding descendant versioned flows.
+ * VersionControlInformation should be mapped to the versioned inner
process group instead of the group contents
+ */
+ @Test
+ public void testMapVersionedProcessGroupsExcludingVersionedDescendants() {
+ // prepare a versioned process group with a nested versioned process
group, each with 1 processor
+ final ProcessGroup innerProcessGroup =
+ prepareProcessGroup(1,false, false, false,
+ false, false,null,
+ false, true, Collections.emptyList());
+ final ProcessGroup processGroup =
+ prepareProcessGroup(1,false,false, false,
+ false, false, null,
+ false, true, Lists.newArrayList(innerProcessGroup));
+
+ final List<ProcessGroup> allProcessGroups =
Lists.newArrayList(innerProcessGroup);
+ when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
+
+ // perform the mapping, excluding descendant versioned flows
+ final InstantiatedVersionedProcessGroup versionedProcessGroup =
+ flowMapper.mapProcessGroup(processGroup,
controllerServiceProvider, flowRegistryClient,
+ false);
+ final VersionedProcessGroup innerVersionedProcessGroup =
+ versionedProcessGroup.getProcessGroups().iterator().next();
+
+ // verify root versioned process group contents only
+ verifyVersionedProcessGroup(processGroup,
versionedProcessGroup,false,false);
+
+ // verify versioned descendant is present with VersionControlInfo only
+ verifyVersionedProcessGroup(innerProcessGroup,
innerVersionedProcessGroup,true,false);
+ }
+
+ /**
+ * Test mapping a versioned ProcessGroup model to a non-versioned
VersionedProcessGroup. Version info is ignored.
+ * Most elements are exercised here... including labels, ports,
processors, connections, funnels, nested process groups,
+ * remote process groups, congtroller services, external controller
service references and variable registries.
+ */
+ @Test
+ public void testMapNonVersionedProcessGroups() {
+ // create a controller service with a different process group id so
it's treated as external
+ final ControllerServiceNode externalControllerServiceNode =
prepareControllerService(UUID.randomUUID().toString());
+
+ // prepare a process group with nested process groups
+ final ProcessGroup innerInnerProcessGroup =
+ prepareProcessGroup(0,false, true, false,
+ true, false,null,
+ true, false, Collections.emptyList());
+ final ProcessGroup innerProcessGroup =
+ prepareProcessGroup(1,true, false, false,
+ true, true, externalControllerServiceNode,
+ true, true,
Lists.newArrayList(innerInnerProcessGroup));
+ final ProcessGroup processGroup =
+ prepareProcessGroup(2,false,false, true,
+ false, true, null,
+ false, true, Lists.newArrayList(innerProcessGroup));
+
+ final List<ProcessGroup> allProcessGroups =
Lists.newArrayList(innerProcessGroup, innerInnerProcessGroup);
+ when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
+
+ // perform the mapping
+ final InstantiatedVersionedProcessGroup versionedProcessGroup =
+ flowMapper.mapNonVersionedProcessGroup(processGroup,
controllerServiceProvider);
+
+ // recursively verify versioned process group contents
+ verifyVersionedProcessGroup(processGroup, versionedProcessGroup,
false,true);
+
+ // verify external controller service reference
+ final Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences =
+ versionedProcessGroup.getExternalControllerServiceReferences();
+ final String expectedExternalControllerServiceReferenceKey =
flowMapper.getGroupId(externalControllerServiceNode.getIdentifier());
+ final ExternalControllerServiceReference
externalControllerServiceReference =
+
externalControllerServiceReferences.get(expectedExternalControllerServiceReferenceKey);
+ assertNotNull(externalControllerServiceReference);
+ assertEquals(expectedExternalControllerServiceReferenceKey,
externalControllerServiceReference.getIdentifier());
+ assertEquals(externalControllerServiceNode.getName(),
externalControllerServiceReference.getName());
+ }
+
+ private ProcessGroup prepareProcessGroupWithParameterContext(final
List<ProcessGroup> childProcessGroups,
+ final boolean
includeParameterContext,
+ final boolean
isVersionControlled) {
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
+ if (includeParameterContext) {
+ final ParameterContext parameterContext =
mock(ParameterContext.class);
+
when(processGroup.getParameterContext()).thenReturn(parameterContext);
+ when(parameterContext.getName()).thenReturn("context" +
(counter++));
+ final Map<ParameterDescriptor, Parameter> parametersMap =
Maps.newHashMap();
+ when(parameterContext.getParameters()).thenReturn(parametersMap);
+
+ addParameter(parametersMap, "value" + (counter++), false);
+ addParameter(parametersMap, "value" + (counter++), true);
+ addParameter(parametersMap, null, true);
+ }
+
+ if (isVersionControlled) {
+
when(processGroup.getVersionControlInformation()).thenReturn(mock(VersionControlInformation.class));
+ }
+
+
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
+
+ return processGroup;
+ }
+
+ private void addParameter(final Map<ParameterDescriptor, Parameter>
parametersMap, final String value, final boolean isSensitive) {
+
+ final ParameterDescriptor parameterDescriptor =
+ new ParameterDescriptor.Builder().name("param" +
(counter++)).description("description" +
(counter++)).sensitive(isSensitive).build();
+ final Parameter parameter = mock(Parameter.class);
+ when(parameter.getDescriptor()).thenReturn(parameterDescriptor);
+ when(parameter.getValue()).thenReturn(value);
+ parametersMap.put(parameterDescriptor, parameter);
+ }
+
+ private void verifyParameterContext(final ParameterContext
parameterContext, final VersionedParameterContext versionedParameterContext) {
+ assertEquals(parameterContext.getName(),
versionedParameterContext.getName());
+
+ final Collection<Parameter> parameters =
parameterContext.getParameters().values();
+ final Set<VersionedParameter> versionedParameters =
versionedParameterContext.getParameters();
+ // parameter order is not deterministic - use unique names to map up
matching parameters
+ final Iterator<Parameter> parametersIterator = parameters.iterator();
+ while (parametersIterator.hasNext()) {
+ final Parameter parameter = parametersIterator.next();
+ final Iterator<VersionedParameter> versionedParameterIterator =
versionedParameters.iterator();
+ while (versionedParameterIterator.hasNext()) {
+ final VersionedParameter versionedParameter =
versionedParameterIterator.next();
+ if
(versionedParameter.getName().equals(parameter.getDescriptor().getName())) {
+ verifyParameter(versionedParameter, parameter);
+ versionedParameterIterator.remove();
+ break;
+ }
+ }
+ }
+ assertTrue("Failed to match parameters by unique name",
versionedParameters.isEmpty());
+
+ }
+
+ private void verifyParameter(final VersionedParameter versionedParameter,
final Parameter parameter) {
+ final ParameterDescriptor parameterDescriptor =
parameter.getDescriptor();
+
+ assertEquals(parameterDescriptor.getName(),
versionedParameter.getName());
+ assertEquals(parameterDescriptor.getDescription(),
versionedParameter.getDescription());
+ assertEquals(parameterDescriptor.isSensitive(),
versionedParameter.isSensitive());
+ if (parameterDescriptor.isSensitive()) {
+ // verify parameter value is null for sensitive parameters
+ assertNull(versionedParameter.getValue());
+ } else {
+ assertEquals(parameter.getValue(), versionedParameter.getValue());
+ }
+ }
+
+ private ProcessGroup prepareProcessGroup(final int numProcessors, final
boolean includeFunnel,final boolean includePorts,
+ final boolean includeLabels,
final boolean includeVariableRegistry,
+ final boolean
includeControllerService,
+ final ControllerServiceNode
externalControllerServiceNode,
+ final boolean
includeRemoteProcessGroup, final boolean includeVersionControlInfo,
+ final List<ProcessGroup>
childProcessGroups) {
+ final String processGroupId = UUID.randomUUID().toString();
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+ when(processGroup.getIdentifier()).thenReturn(processGroupId);
+
when(processGroup.getProcessGroupIdentifier()).thenReturn(processGroupId);
+ when(processGroup.getName()).thenReturn("group"+(counter++));
+ when(processGroup.getComments()).thenReturn("comments"+(counter++));
+ when(processGroup.getPosition()).thenReturn(new Position(counter++,
counter++));
+ final ParameterContext parameterContext = mock(ParameterContext.class);
+ when(processGroup.getParameterContext()).thenReturn(parameterContext);
+ when(parameterContext.getName()).thenReturn("context"+(counter++));
+
+ // prep funnels
+ final Set<Funnel> funnels = Sets.newHashSet();
+ if (includeFunnel) {
+ funnels.add(prepareFunnel(processGroupId));
+ }
+ when(processGroup.getFunnels()).thenReturn(funnels);
+
+ // prep ports
+ final Set<Port> inputPorts = Sets.newHashSet();
+ final Set<Port> outputPorts = Sets.newHashSet();
+ if (includePorts) {
+ inputPorts.add(preparePort(processGroupId, PortType.INPUT_PORT));
+ outputPorts.add(preparePort(processGroupId, PortType.OUTPUT_PORT));
+ }
+ when(processGroup.getInputPorts()).thenReturn(inputPorts);
+ when(processGroup.getOutputPorts()).thenReturn(outputPorts);
+
+ // prep labels
+ final Set<Label> labels = Sets.newHashSet();
+ if (includeLabels) {
+ labels.add(prepareLabel(processGroupId));
+ }
+ when(processGroup.getLabels()).thenReturn(labels);
+
+ // prep connections and processors
+ final Set<ProcessorNode> processorNodes = Sets.newLinkedHashSet();
+ final Set<Connection> connections = Sets.newHashSet();
+ if (numProcessors == 2) {
+ // 2 processors connected together
+ final ProcessorNode processorNode1 =
prepareProcessor(processGroup, externalControllerServiceNode);
+ final ProcessorNode processorNode2 =
prepareProcessor(processGroup, externalControllerServiceNode);
+ processorNodes.add(processorNode1);
+ processorNodes.add(processorNode2);
+ connections.add(prepareConnection(processorNode1, processorNode2,
processGroup));
+ } else if (numProcessors == 1) {
+ // a processor connected to itself
+ final ProcessorNode processorNode1 =
prepareProcessor(processGroup, externalControllerServiceNode);
+ processorNodes.add(processorNode1);
+ connections.add(prepareConnection(processorNode1, processorNode1,
processGroup));
+ }
+ when(processGroup.getProcessors()).thenReturn(processorNodes);
+ when(processGroup.getConnections()).thenReturn(connections);
+
+ // prep controller services
+ final Set<ControllerServiceNode> controllerServiceNodes =
Sets.newHashSet();
+ if (includeControllerService) {
+
controllerServiceNodes.add(prepareControllerService(processGroupId));
+ }
+
when(processGroup.getControllerServices(false)).thenReturn(controllerServiceNodes);
+
+ // prep variable registry
+ final ComponentVariableRegistry componentVariableRegistry =
mock(ComponentVariableRegistry.class);
+
when(processGroup.getVariableRegistry()).thenReturn(componentVariableRegistry);
+ final Map<VariableDescriptor, String> registryVariableMap =
Maps.newHashMap();
+ if (includeVariableRegistry) {
+ registryVariableMap.putAll(prepareVariableRegistry());
+ }
+
when(componentVariableRegistry.getVariableMap()).thenReturn(registryVariableMap);
+
+ // prepare remote process group
+ final Set<RemoteProcessGroup> remoteProcessGroups = Sets.newHashSet();
+ if (includeRemoteProcessGroup) {
+ remoteProcessGroups.add(prepareRemoteProcessGroup(processGroupId));
+ }
+
when(processGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+
+ // prep version control info
+ if (includeVersionControlInfo) {
+ final VersionControlInformation versionControlInformation =
prepareVersionControlInfo();
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+ }
+
+ // prep nested process groups
+
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
+
+ return processGroup;
+ }
+
+ private Funnel prepareFunnel(final String processGroupId) {
+ final Funnel funnel = mock(Funnel.class);
+ prepareComponentAuthorizable(funnel, processGroupId);
+ preparePositionable(funnel);
+ return funnel;
+ }
+
+ private Port preparePort(final String processGroupId, final PortType
portType) {
+ final Port port = mock(Port.class);
+ prepareComponentAuthorizable(port, processGroupId);
+ preparePositionable(port);
+ prepareConnectable(port, ConnectableType.valueOf(portType.name()));
+ return port;
+ }
+
+ private Label prepareLabel(final String processGroupId) {
+ final Label label = mock(Label.class);
+ prepareComponentAuthorizable(label, processGroupId);
+ preparePositionable(label);
+ when(label.getSize()).thenReturn(new Size(counter++, counter++));
+ return label;
+ }
+
+ private ProcessorNode prepareProcessor(final ProcessGroup processGroup,
final ControllerServiceNode externalControllerServiceNode) {
+ final ProcessorNode processorNode = mock(ProcessorNode.class);
+ prepareComponentAuthorizable(processorNode,
processGroup.getIdentifier());
+ preparePositionable(processorNode);
+ prepareConnectable(processorNode, ConnectableType.PROCESSOR);
+ when(processorNode.getProcessGroup()).thenReturn(processGroup);
+
when(processorNode.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet());
+ when(processorNode.getBulletinLevel()).thenReturn(LogLevel.INFO);
+ when(processorNode.getExecutionNode()).thenReturn(ExecutionNode.ALL);
+
when(processorNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
+
when(processorNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
+
+ final String rawPropertyValue = "propValue";
+ final PropertyDescriptor.Builder propertyDescriptorBuilder =
+ new
PropertyDescriptor.Builder().name("propName").sensitive(false).displayName("displayName");
+ if (externalControllerServiceNode != null) {
+
propertyDescriptorBuilder.identifiesControllerService(ControllerService.class);
+
when(controllerServiceProvider.getControllerServiceNode(rawPropertyValue)).thenReturn(externalControllerServiceNode);
+ }
+ final PropertyDescriptor propertyDescriptor =
propertyDescriptorBuilder.build();
+ final PropertyConfiguration propertyConfiguration =
mock(PropertyConfiguration.class);
+ final Map<PropertyDescriptor, PropertyConfiguration> properties =
Maps.newHashMap();
+ properties.put(propertyDescriptor, propertyConfiguration);
+ when(processorNode.getProperties()).thenReturn(properties);
+
when(processorNode.getProperty(propertyDescriptor)).thenReturn(propertyConfiguration);
+ when(propertyConfiguration.getRawValue()).thenReturn(rawPropertyValue);
+
+ return processorNode;
+ }
+
+ private Connection prepareConnection(final ProcessorNode
sourceProcessorNode, final ProcessorNode destProcessorNode,
+ final ProcessGroup processGroup) {
+ final Connection connection = mock(Connection.class);
+
when(connection.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ when(connection.getProcessGroup()).thenReturn(processGroup);
+ when(connection.getBendPoints()).thenReturn(Lists.newArrayList(new
Position(counter++, counter++)));
+ when(connection.getRelationships()).thenReturn(Lists.newArrayList());
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+ when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+
when(flowFileQueue.getPriorities()).thenReturn(Collections.emptyList());
+
when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE);
+
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
+
when(sourceProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ when(connection.getSource()).thenReturn(sourceProcessorNode);
+
when(destProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ when(connection.getDestination()).thenReturn(destProcessorNode);
+ return connection;
+ }
+
+ private Map<VariableDescriptor, String> prepareVariableRegistry() {
+ final VariableDescriptor variableDescriptor =
+ new VariableDescriptor.Builder("variable"+(counter++)).build();
+ final Map<VariableDescriptor, String> variableRegistryMap =
Maps.newHashMap();
+ variableRegistryMap.put(variableDescriptor, "value"+(counter++));
+ return variableRegistryMap;
+ }
+
+ private ControllerServiceNode prepareControllerService(final String
processGroupId) {
+ final ControllerServiceNode controllerServiceNode =
mock(ControllerServiceNode.class);
+ prepareComponentAuthorizable(controllerServiceNode, processGroupId);
+ when(controllerServiceNode.getName()).thenReturn("service" +
(counter++));
+
when(controllerServiceNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
+
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(mock(ControllerService.class));
+
when(controllerServiceNode.getProperties()).thenReturn(Collections.emptyMap());
+ return controllerServiceNode;
+ }
+
+ private RemoteProcessGroup prepareRemoteProcessGroup(final String
processGroupId) {
+ final RemoteProcessGroup remoteProcessGroup =
mock(RemoteProcessGroup.class);
+ prepareComponentAuthorizable(remoteProcessGroup, processGroupId);
+ preparePositionable(remoteProcessGroup);
+ when(remoteProcessGroup.getName()).thenReturn("remote" + (counter++));
+
when(remoteProcessGroup.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
+ final RemoteGroupPort remoteGroupInputPort =
prepareRemoteGroupPort(remoteProcessGroup);
+
when(remoteProcessGroup.getInputPorts()).thenReturn(Sets.newHashSet(remoteGroupInputPort));
+ final RemoteGroupPort remoteGroupOutputPort =
prepareRemoteGroupPort(remoteProcessGroup);
+
when(remoteProcessGroup.getOutputPorts()).thenReturn(Sets.newHashSet(remoteGroupOutputPort));
+ return remoteProcessGroup;
+ }
+
+ private RemoteGroupPort prepareRemoteGroupPort(final RemoteProcessGroup
remoteProcessGroup) {
+ final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
+ prepareComponentAuthorizable(remoteGroupPort,
remoteProcessGroup.getIdentifier());
+ when(remoteGroupPort.getName()).thenReturn("remotePort" + (counter++));
+
when(remoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
+ return remoteGroupPort;
+ }
+
+ private VersionControlInformation prepareVersionControlInfo() {
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString());
+
when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString());
+
when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString());
+ when(versionControlInformation.getVersion()).thenReturn(counter++);
+ return versionControlInformation;
+ }
+
+ private void prepareComponentAuthorizable(final ComponentAuthorizable
authorizable, final String processGroupId) {
+
when(authorizable.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+
when(authorizable.getProcessGroupIdentifier()).thenReturn(processGroupId);
+ }
+
+ private void preparePositionable(final Positionable positionable) {
+ when(positionable.getPosition()).thenReturn(new Position(counter++,
counter++));
+ }
+
+ private void prepareConnectable(final Connectable connectable, final
ConnectableType connectableType) {
+ when(connectable.getName()).thenReturn("connectable" + (counter++));
+ when(connectable.getConnectableType()).thenReturn(connectableType);
+ }
+
+ /**
+ * Verify the given VersionedProcessGroup was correctly mapped from the
given ProcessGroup, including child groups
+ * when boolean indicator is true. If expectVersionControlInfo boolean
indicator is true, the mapped group should
+ * only contain VersionControlInfo, otherwise it should instead contain
its full contents.
+ */
+ private void verifyVersionedProcessGroup(final ProcessGroup processGroup,
final VersionedProcessGroup versionedProcessGroup,
+ final boolean
expectVersionControlInfo, final boolean verifyChildProcessGroups) {
+ final String expectedGroupIdentifier =
flowMapper.getGroupId(processGroup.getProcessGroupIdentifier());
+
+ // verify process group fields
+ assertEquals(processGroup.getName(), versionedProcessGroup.getName());
+ assertEquals(flowMapper.getGroupId(processGroup.getIdentifier()),
versionedProcessGroup.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedProcessGroup.getGroupIdentifier());
+ assertEquals(processGroup.getComments(),
versionedProcessGroup.getComments());
+ assertEquals(processGroup.getPosition().getX(),
versionedProcessGroup.getPosition().getX(), 0);
+ assertEquals(processGroup.getPosition().getY(),
versionedProcessGroup.getPosition().getY(), 0);
+
+ final String expectedParameterContextName =
+ (processGroup.getParameterContext() != null ?
processGroup.getParameterContext().getName() : null);
+ assertEquals(expectedParameterContextName,
versionedProcessGroup.getParameterContextName());
+
+ // verify either version control info or full group contents
+ if (expectVersionControlInfo) {
+ final VersionControlInformation versionControlInfo =
processGroup.getVersionControlInformation();
+ final VersionedFlowCoordinates versionedFlowCoordinates =
versionedProcessGroup.getVersionedFlowCoordinates();
+ assertNotNull(versionedFlowCoordinates.getRegistryUrl());
+ assertEquals(versionControlInfo.getBucketIdentifier(),
versionedFlowCoordinates.getBucketId());
+ assertEquals(versionControlInfo.getFlowIdentifier(),
versionedFlowCoordinates.getFlowId());
+ assertEquals(versionControlInfo.getVersion(),
versionedFlowCoordinates.getVersion());
+ } else {
+ assertNull(versionedProcessGroup.getVersionedFlowCoordinates());
+
+ // verify funnels
+ final Set<Funnel> funnels = processGroup.getFunnels();
+ final Set<VersionedFunnel> versionedFunnels =
versionedProcessGroup.getFunnels();
+ if (funnels.isEmpty()) {
+ assertTrue(versionedFunnels.isEmpty());
+ } else {
+ final Funnel funnel = funnels.iterator().next();
+ final VersionedFunnel versionedFunnel =
versionedFunnels.iterator().next();
+
+ assertEquals(flowMapper.getGroupId(funnel.getIdentifier()),
versionedFunnel.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedFunnel.getGroupIdentifier());
+ assertEquals(funnel.getPosition().getX(),
versionedFunnel.getPosition().getX(), 0);
+ assertEquals(funnel.getPosition().getY(),
versionedFunnel.getPosition().getY(), 0);
+ }
+
+ // verify ports
+ verifyPorts(processGroup.getInputPorts(),
versionedProcessGroup.getInputPorts(), PortType.INPUT_PORT,
expectedGroupIdentifier);
+ verifyPorts(processGroup.getOutputPorts(),
versionedProcessGroup.getOutputPorts(), PortType.OUTPUT_PORT,
expectedGroupIdentifier);
+
+ // verify labels
+ final Set<Label> labels = processGroup.getLabels();
+ final Set<VersionedLabel> versionedLabels =
versionedProcessGroup.getLabels();
+ if (labels.isEmpty()) {
+ assertTrue(versionedLabels.isEmpty());
+ } else {
+ final Label label = labels.iterator().next();
+ final VersionedLabel versionedLabel =
versionedLabels.iterator().next();
+
+ assertEquals(flowMapper.getGroupId(label.getIdentifier()),
versionedLabel.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedLabel.getGroupIdentifier());
+ assertEquals(label.getPosition().getX(),
versionedLabel.getPosition().getX(), 0);
+ assertEquals(label.getPosition().getY(),
versionedLabel.getPosition().getY(), 0);
+ assertEquals(label.getSize().getHeight(),
versionedLabel.getHeight(), 0);
+ assertEquals(label.getSize().getWidth(),
versionedLabel.getWidth(), 0);
+ }
+
+ // verify processors
+ final Collection<ProcessorNode> processorNodes =
processGroup.getProcessors();
+ final Set<VersionedProcessor> versionedProcessors =
versionedProcessGroup.getProcessors();
+ // first verify the number of processors matches
+ assertEquals(processorNodes.size(), versionedProcessors.size());
+ // processor order is not deterministic - use unique names to map
up matching processors
+ final Iterator<ProcessorNode> processorNodesIterator =
processorNodes.iterator();
+ while (processorNodesIterator.hasNext()) {
+ final ProcessorNode processorNode =
processorNodesIterator.next();
+ final Iterator<VersionedProcessor> versionedProcessorIterator
= versionedProcessors.iterator();
+ while (versionedProcessorIterator.hasNext()) {
+ final VersionedProcessor versionedProcessor =
versionedProcessorIterator.next();
+ if
(versionedProcessor.getName().equals(processorNode.getName())) {
+ verifyProcessor(versionedProcessor, processorNode,
expectedGroupIdentifier);
+ versionedProcessorIterator.remove();
+ break;
+ }
+ }
+ }
+ assertTrue("Failed to match processors by unique name",
versionedProcessors.isEmpty());
+
+ // verify connections
+ final Set<Connection> connections = processGroup.getConnections();
+ final Set<VersionedConnection> versionedConnections =
versionedProcessGroup.getConnections();
+ if (connections.isEmpty()) {
+ assertTrue(versionedConnections.isEmpty());
+ } else {
+ final Connection connection = connections.iterator().next();
+ final VersionedConnection versionedConnection =
versionedConnections.iterator().next();
+
+ assertEquals(connection.getName(),
versionedConnection.getName());
+
assertEquals(flowMapper.getGroupId(connection.getIdentifier()),
versionedConnection.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedConnection.getGroupIdentifier());
+ assertEquals(connection.getBendPoints().get(0).getX(),
versionedConnection.getBends().get(0).getX(), 0);
+ assertEquals(connection.getBendPoints().get(0).getY(),
versionedConnection.getBends().get(0).getY(), 0);
+
assertEquals(flowMapper.getGroupId(connection.getSource().getIdentifier()),
versionedConnection.getSource().getId());
+ assertEquals(expectedGroupIdentifier,
versionedConnection.getSource().getGroupId());
+ assertEquals(connection.getSource().getName(),
versionedConnection.getSource().getName());
+
assertEquals(connection.getSource().getConnectableType().name(),
versionedConnection.getSource().getType().name());
+
assertEquals(flowMapper.getGroupId(connection.getDestination().getIdentifier()),
versionedConnection.getDestination().getId());
+ assertEquals(expectedGroupIdentifier,
versionedConnection.getDestination().getGroupId());
+ assertEquals(connection.getDestination().getName(),
versionedConnection.getDestination().getName());
+
assertEquals(connection.getDestination().getConnectableType().name(),
versionedConnection.getDestination().getType().name());
+ }
+
+ // verify controller services
+ final Set<ControllerServiceNode> controllerServiceNodes =
processGroup.getControllerServices(false);
+ final Set<VersionedControllerService> versionedControllerServices
= versionedProcessGroup.getControllerServices();
+ if (controllerServiceNodes.isEmpty()) {
+ assertTrue(versionedControllerServices.isEmpty());
+ } else {
+ final ControllerServiceNode controllerServiceNode =
controllerServiceNodes.iterator().next();
+ final VersionedControllerService versionedControllerService =
versionedControllerServices.iterator().next();
+
+ assertEquals(controllerServiceNode.getName(),
versionedControllerService.getName());
+
assertEquals(flowMapper.getGroupId(controllerServiceNode.getIdentifier()),
versionedControllerService.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedControllerService.getGroupIdentifier());
+ }
+
+ // verify variables
+ final Map<VariableDescriptor, String> variableRegistryMap =
processGroup.getVariableRegistry().getVariableMap();
+ final Map<String, String> versionedVariableMap =
versionedProcessGroup.getVariables();
+ if (variableRegistryMap.isEmpty()) {
+ assertTrue(versionedVariableMap.isEmpty());
+ } else {
+ final VariableDescriptor variableRegistryKey =
variableRegistryMap.keySet().iterator().next();
+ assertEquals(variableRegistryMap.get(variableRegistryKey),
versionedVariableMap.get(variableRegistryKey.getName()));
+ }
+
+ // verify remote process group(s)
+ final Set<RemoteProcessGroup> remoteProcessGroups =
processGroup.getRemoteProcessGroups();
+ final Set<VersionedRemoteProcessGroup>
versionedRemoteProcessGroups = versionedProcessGroup.getRemoteProcessGroups();
+ if (remoteProcessGroups.isEmpty()) {
+ assertTrue(versionedRemoteProcessGroups.isEmpty());
+ } else {
+ final RemoteProcessGroup remoteProcessGroup =
remoteProcessGroups.iterator().next();
+ final VersionedRemoteProcessGroup versionedRemoteProcessGroup
= versionedRemoteProcessGroups.iterator().next();
+
+ assertEquals(remoteProcessGroup.getName(),
versionedRemoteProcessGroup.getName());
+
assertEquals(flowMapper.getGroupId(remoteProcessGroup.getIdentifier()),
versionedRemoteProcessGroup.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedRemoteProcessGroup.getGroupIdentifier());
+ assertEquals(remoteProcessGroup.getPosition().getX(),
versionedRemoteProcessGroup.getPosition().getX(), 0);
+ assertEquals(remoteProcessGroup.getPosition().getY(),
versionedRemoteProcessGroup.getPosition().getY(), 0);
+
+ // verify remote ports
+ final String expectedPortGroupIdentifier =
flowMapper.getGroupId(remoteProcessGroup.getIdentifier());
+ verifyRemotePorts(remoteProcessGroup.getInputPorts(),
versionedRemoteProcessGroup.getInputPorts(),
+ ComponentType.REMOTE_INPUT_PORT,
expectedPortGroupIdentifier);
+ verifyRemotePorts(remoteProcessGroup.getOutputPorts(),
versionedRemoteProcessGroup.getOutputPorts(),
+ ComponentType.REMOTE_OUTPUT_PORT,
expectedPortGroupIdentifier);
+ }
+
+ if (verifyChildProcessGroups) {
+ // recurse to verify inner process group(s)
+ final Set<ProcessGroup> innerProcessGroups =
processGroup.getProcessGroups();
+ final Set<VersionedProcessGroup> innerVersionedProcessGroups =
versionedProcessGroup.getProcessGroups();
+ if (innerProcessGroups.isEmpty()) {
+ assertTrue(innerVersionedProcessGroups.isEmpty());
+ } else {
+ final ProcessGroup innerProcessGroup =
innerProcessGroups.iterator().next();
+ final VersionedProcessGroup innerVersionedProcessGroup =
innerVersionedProcessGroups.iterator().next();
+
+ verifyVersionedProcessGroup(innerProcessGroup,
innerVersionedProcessGroup, expectVersionControlInfo,
+ verifyChildProcessGroups);
+ }
+ }
+ }
+ }
+
+ private void verifyPorts(final Set<Port> ports, final Set<VersionedPort>
versionedPorts, final PortType portType,
+ final String expectedGroupIdentifier) {
+ if (ports.isEmpty()) {
+ assertTrue(versionedPorts.isEmpty());
+ } else {
+ final Port port = ports.iterator().next();
+ final VersionedPort versionedPort =
versionedPorts.iterator().next();
+
+ assertEquals(flowMapper.getGroupId(port.getIdentifier()),
versionedPort.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedPort.getGroupIdentifier());
+ assertEquals(port.getPosition().getX(),
versionedPort.getPosition().getX(), 0);
+ assertEquals(port.getPosition().getY(),
versionedPort.getPosition().getY(), 0);
+ assertEquals(port.getName(), versionedPort.getName());
+ assertEquals(portType, versionedPort.getType());
+ }
+ }
+
+ private void verifyRemotePorts(final Set<RemoteGroupPort> remotePorts,
+ final Set<VersionedRemoteGroupPort>
versionedRemotePorts,
+ final ComponentType componentType, final
String expectedPortGroupIdentifier) {
+ if (remotePorts.isEmpty()) {
+ assertTrue(versionedRemotePorts.isEmpty());
+ } else {
+ final RemoteGroupPort remotePort = remotePorts.iterator().next();
+ final VersionedRemoteGroupPort versionedRemotePort =
versionedRemotePorts.iterator().next();
+
+ assertEquals(flowMapper.getGroupId(remotePort.getIdentifier()),
versionedRemotePort.getIdentifier());
+ assertEquals(expectedPortGroupIdentifier,
versionedRemotePort.getGroupIdentifier());
+ assertEquals(remotePort.getName(), versionedRemotePort.getName());
+ assertEquals(componentType,
versionedRemotePort.getComponentType());
+ }
+ }
+
+ private void verifyProcessor(final VersionedProcessor versionedProcessor,
final ProcessorNode processorNode,
+ final String expectedGroupIdentifier) {
+ assertEquals(processorNode.getName(), versionedProcessor.getName());
+ assertEquals(flowMapper.getGroupId(processorNode.getIdentifier()),
versionedProcessor.getIdentifier());
+ assertEquals(expectedGroupIdentifier,
versionedProcessor.getGroupIdentifier());
+ assertEquals(processorNode.getPosition().getX(),
versionedProcessor.getPosition().getX(), 0);
+ assertEquals(processorNode.getPosition().getY(),
versionedProcessor.getPosition().getY(), 0);
+
+ final PropertyDescriptor propertyDescriptor =
processorNode.getProperties().keySet().iterator().next();
+ final VersionedPropertyDescriptor versionedPropertyDescriptor =
+
versionedProcessor.getPropertyDescriptors().get(propertyDescriptor.getName());
+
assertTrue(versionedProcessor.getProperties().containsKey(propertyDescriptor.getName()));
+ assertNotNull(versionedPropertyDescriptor);
+ assertEquals(propertyDescriptor.getName(),
versionedPropertyDescriptor.getName());
+ assertEquals(propertyDescriptor.getDisplayName(),
versionedPropertyDescriptor.getDisplayName());
+ assertEquals(propertyDescriptor.isSensitive(),
versionedPropertyDescriptor.isSensitive());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 86ec526..7e59744 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1457,15 +1457,17 @@ public interface NiFiServiceFacade {
* @param snapshot the Snapshot to persist
* @param externalControllerServiceReferences a mapping of controller
service id to ExternalControllerServiceReference for any Controller Service
that is referenced in the flow but not included
* in the VersionedProcessGroup
- * @oaram ParameterContext the parameter contexts to include
+ * @param parameterContexts a map of the Parameter Contexts to include
keyed by name
* @param comments about the snapshot
* @param expectedVersion the version to save the flow as
* @return the snapshot that represents what was stored in the registry
*
* @throws NiFiCoreException if unable to register the snapshot with the
flow registry
*/
- VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId,
VersionedFlow flow, VersionedProcessGroup snapshot,
Collection<VersionedParameterContext> parameterContexts,
- Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences, String
comments, int expectedVersion);
+ VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId,
VersionedFlow flow, VersionedProcessGroup snapshot,
+ Map<String,
VersionedParameterContext> parameterContexts,
+ Map<String,
ExternalControllerServiceReference> externalControllerServiceReferences,
+ String comments, int
expectedVersion);
/**
* Updates the Version Control Information on the Process Group with the
given ID
@@ -1502,6 +1504,24 @@ public interface NiFiServiceFacade {
VersionedFlowSnapshot
getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo,
boolean fetchRemoteFlows);
/**
+ * Get the latest Versioned Flow Snapshot from the registry for the
Process Group with the given ID
+ *
+ * @param processGroupId the ID of the Process Group
+ * @return the latest Versioned Flow Snapshot for download
+ *
+ * @throws ResourceNotFoundException if the Versioned Flow Snapshot could
not be found
+ */
+ VersionedFlowSnapshot getVersionedFlowSnapshotByGroupId(String
processGroupId);
+
+ /**
+ * Get the current state of the Process Group with the given ID, converted
to a Versioned Flow Snapshot
+ *
+ * @param processGroupId the ID of the Process Group
+ * @return the current Process Group converted to a Versioned Flow
Snapshot for download
+ */
+ VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(String
processGroupId);
+
+ /**
* Returns the name of the Flow Registry that is registered with the given
ID. If no Flow Registry exists with the given ID, will return
* the ID itself as the name
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 6532f51..1b5bb54 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -107,6 +107,7 @@ import
org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConfigurableComponent;
@@ -2511,7 +2512,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
// import the template
final Template template = templateDAO.importTemplate(templateDTO,
groupId);
- // save the flow
+
controllerFacade.save();
// return the template dto
@@ -4304,7 +4305,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
// Create a VersionedProcessGroup snapshot of the flow as it is
currently.
final InstantiatedVersionedProcessGroup versionedProcessGroup =
createFlowSnapshot(groupId);
- final Collection<VersionedParameterContext> parameterContexts =
createVersionedParameterContexts(processGroup);
+ final Map<String, VersionedParameterContext> parameterContexts =
createVersionedParameterContexts(processGroup);
final String flowId = versionedFlowDto.getFlowId() == null ?
UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
@@ -4396,6 +4397,76 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
+ public VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(final String
processGroupId) {
+ final ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInfo =
processGroup.getVersionControlInformation();
+
+ // Create a complete (include descendant flows) VersionedProcessGroup
snapshot of the flow as it is
+ // currently without any registry related fields populated, even if
the flow is currently versioned.
+ final NiFiRegistryFlowMapper mapper =
makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
+ mapper.mapNonVersionedProcessGroup(processGroup,
controllerFacade.getControllerServiceProvider());
+
+ // Create a complete (include descendant flows) map of parameter
contexts
+ final Map<String, VersionedParameterContext> parameterContexts =
+ mapper.mapParameterContexts(processGroup, true);
+
+ final VersionedFlowSnapshot nonVersionedFlowSnapshot = new
VersionedFlowSnapshot();
+ nonVersionedFlowSnapshot.setFlowContents(nonVersionedProcessGroup);
+
nonVersionedFlowSnapshot.setExternalControllerServices(nonVersionedProcessGroup.getExternalControllerServiceReferences());
+ nonVersionedFlowSnapshot.setParameterContexts(parameterContexts);
+
nonVersionedFlowSnapshot.setFlowEncodingVersion(RestBasedFlowRegistry.FLOW_ENCODING_VERSION);
+
+ return nonVersionedFlowSnapshot;
+ }
+
+ @Override
+ public VersionedFlowSnapshot getVersionedFlowSnapshotByGroupId(final
String processGroupId) {
+ final ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInfo =
processGroup.getVersionControlInformation();
+
+ return
getVersionedFlowSnapshot(versionControlInfo.getRegistryIdentifier(),
versionControlInfo.getBucketIdentifier(),
+ versionControlInfo.getFlowIdentifier(),
versionControlInfo.getVersion(), true);
+ }
+
+ @Override
+ public VersionedFlowSnapshot getVersionedFlowSnapshot(final
VersionControlInformationDTO versionControlInfo, final boolean
fetchRemoteFlows) {
+ return getVersionedFlowSnapshot(versionControlInfo.getRegistryId(),
versionControlInfo.getBucketId(), versionControlInfo.getFlowId(),
+ versionControlInfo.getVersion(), fetchRemoteFlows);
+ }
+
+ /**
+ *
+ * @param registryId the id of the registry to retrieve the
versioned flow from
+ * @param bucketId the id of the bucket within the registry
+ * @param flowId the id of the flow within the bucket/registry
+ * @param flowVersion the version of the flow to retrieve
+ * @param fetchRemoteFlows indicator to include remote flows when
retrieving the flow
+ * @return a VersionedFlowSnapshot from a registry with the given version
+ */
+ private VersionedFlowSnapshot getVersionedFlowSnapshot(final String
registryId, final String bucketId, final String flowId,
+ final Integer
flowVersion, final boolean fetchRemoteFlows) {
+ final FlowRegistry flowRegistry =
flowRegistryClient.getFlowRegistry(registryId);
+ if (flowRegistry == null) {
+ throw new ResourceNotFoundException("Could not find any Flow
Registry registered with identifier " + registryId);
+ }
+
+ final VersionedFlowSnapshot snapshot;
+ try {
+ snapshot = flowRegistry.getFlowContents(bucketId, flowId,
flowVersion, fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
+ } catch (final NiFiRegistryException e) {
+ logger.error(e.getMessage(), e);
+ throw new IllegalArgumentException("The Flow Registry with ID " +
registryId + " reports that no Flow exists with Bucket "
+ + bucketId + ", Flow " + flowId + ", Version " +
flowVersion);
+ } catch (final IOException ioe) {
+ throw new IllegalStateException(
+ "Failed to communicate with Flow Registry when attempting
to retrieve a versioned flow");
+ }
+
+ return snapshot;
+ }
+
+ @Override
public VersionedFlow deleteVersionedFlow(final String registryId, final
String bucketId, final String flowId) {
final FlowRegistry registry =
flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
@@ -4424,30 +4495,14 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
private InstantiatedVersionedProcessGroup createFlowSnapshot(final String
processGroupId) {
final ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
- final NiFiRegistryFlowMapper mapper = new
NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final NiFiRegistryFlowMapper mapper =
makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final InstantiatedVersionedProcessGroup versionedGroup =
mapper.mapProcessGroup(processGroup,
controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
return versionedGroup;
}
- private Collection<VersionedParameterContext>
createVersionedParameterContexts(final ProcessGroup processGroup) {
- final NiFiRegistryFlowMapper mapper = new
NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
- final Collection<VersionedParameterContext> parameterContexts = new
ArrayList<>();
- createVersionedParameterContexts(processGroup, mapper,
parameterContexts);
- return parameterContexts;
- }
-
- private void createVersionedParameterContexts(final ProcessGroup
processGroup, final NiFiRegistryFlowMapper mapper, final
Collection<VersionedParameterContext> contextCollection) {
- final ParameterContext parameterContext =
processGroup.getParameterContext();
- if (parameterContext != null) {
- final VersionedParameterContext versionedContext =
mapper.mapParameterContext(processGroup.getParameterContext());
- contextCollection.add(versionedContext);
- }
-
- for (final ProcessGroup child : processGroup.getProcessGroups()) {
- if (child.getVersionControlInformation() == null) {
- createVersionedParameterContexts(child, mapper,
contextCollection);
- }
- }
+ private Map<String, VersionedParameterContext>
createVersionedParameterContexts(final ProcessGroup processGroup) {
+ final NiFiRegistryFlowMapper mapper =
makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ return mapper.mapParameterContexts(processGroup, false);
}
@Override
@@ -4472,7 +4527,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
throw new NiFiCoreException("Failed to retrieve flow with Flow
Registry in order to calculate local differences due to " + e.getMessage(), e);
}
- final NiFiRegistryFlowMapper mapper = new
NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final NiFiRegistryFlowMapper mapper =
makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localGroup =
mapper.mapProcessGroup(processGroup,
controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final VersionedProcessGroup registryGroup =
versionedFlowSnapshot.getFlowContents();
@@ -4540,7 +4595,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String
registryId, final VersionedFlow flow, final VersionedProcessGroup snapshot,
- final
Collection<VersionedParameterContext> parameterContexts,
+ final
Map<String, VersionedParameterContext> parameterContexts,
final
Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences, final String comments,
final int
expectedVersion) {
final FlowRegistry registry =
flowRegistryClient.getFlowRegistry(registryId);
@@ -4608,7 +4663,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
public Set<AffectedComponentEntity>
getComponentsAffectedByVersionChange(final String processGroupId, final
VersionedFlowSnapshot updatedSnapshot) {
final ProcessGroup group =
processGroupDAO.getProcessGroup(processGroupId);
- final NiFiRegistryFlowMapper mapper = new
NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final NiFiRegistryFlowMapper mapper =
makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localContents =
mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(),
flowRegistryClient, true);
final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Local Flow", localContents);
@@ -4923,25 +4978,6 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public VersionedFlowSnapshot getVersionedFlowSnapshot(final
VersionControlInformationDTO versionControlInfo, final boolean
fetchRemoteFlows) {
- final FlowRegistry flowRegistry =
flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
- if (flowRegistry == null) {
- throw new ResourceNotFoundException("Could not find any Flow
Registry registered with identifier " + versionControlInfo.getRegistryId());
- }
-
- final VersionedFlowSnapshot snapshot;
- try {
- snapshot =
flowRegistry.getFlowContents(versionControlInfo.getBucketId(),
versionControlInfo.getFlowId(), versionControlInfo.getVersion(),
fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
- } catch (final NiFiRegistryException | IOException e) {
- logger.error(e.getMessage(), e);
- throw new IllegalArgumentException("The Flow Registry with ID " +
versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
- + versionControlInfo.getBucketId() + ", Flow " +
versionControlInfo.getFlowId() + ", Version " +
versionControlInfo.getVersion());
- }
-
- return snapshot;
- }
-
- @Override
public String getFlowRegistryName(final String flowRegistryId) {
final FlowRegistry flowRegistry =
flowRegistryClient.getFlowRegistry(flowRegistryId);
return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
@@ -5397,6 +5433,16 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
outputPortDAO.verifyPublicPortUniqueness(portId, portName);
}
+ /**
+ * Create a new flow mapper using a mockable method for testing
+ *
+ * @param extensionManager the extension manager to create the flow
mapper with
+ * @return a new NiFiRegistryFlowMapper instance
+ */
+ protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final
ExtensionManager extensionManager) {
+ return new NiFiRegistryFlowMapper(extensionManager);
+ }
+
/* setters */
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 1a73350..dcc34a1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -512,18 +512,20 @@ public abstract class ApplicationResource {
/**
* Authorizes the specified process group.
*
- * @param processGroupAuthorizable process group
- * @param authorizer authorizer
- * @param lookup lookup
- * @param action action
- * @param authorizeReferencedServices whether to authorize referenced
services
- * @param authorizeTemplates whether to authorize templates
- * @param authorizeControllerServices whether to authorize controller
services
+ * @param processGroupAuthorizable process group
+ * @param authorizer authorizer
+ * @param lookup lookup
+ * @param action action
+ * @param authorizeReferencedServices whether to authorize referenced
services
+ * @param authorizeTemplates whether to authorize templates
+ * @param authorizeControllerServices whether to authorize controller
services
+ * @param authorizeTransitiveServices whether to authorize transitive
services
+ * @param authorizeParameterReferences whether to authorize parameter
references
*/
protected void authorizeProcessGroup(final ProcessGroupAuthorizable
processGroupAuthorizable, final Authorizer authorizer, final AuthorizableLookup
lookup, final RequestAction action,
final boolean
authorizeReferencedServices, final boolean authorizeTemplates,
final boolean
authorizeControllerServices, final boolean authorizeTransitiveServices,
- final boolean
authorizeParamterReferences) {
+ final boolean
authorizeParameterReferences) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Consumer<Authorizable> authorize = authorizable ->
authorizable.authorize(authorizer, action, user);
@@ -542,7 +544,7 @@ public abstract class ApplicationResource {
}
// authorize any referenced parameters if necessary
- if (authorizeParamterReferences) {
+ if (authorizeParameterReferences) {
AuthorizeParameterReference.authorizeParameterReferences(processorAuthorizable,
authorizer, processorAuthorizable.getParameterContext(), user);
}
});
@@ -570,7 +572,7 @@ public abstract class ApplicationResource {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(controllerServiceAuthorizable,
authorizer, lookup, authorizeTransitiveServices);
}
- if (authorizeParamterReferences) {
+ if (authorizeParameterReferences) {
AuthorizeParameterReference.authorizeParameterReferences(controllerServiceAuthorizable,
authorizer, controllerServiceAuthorizable.getParameterContext(), user);
}
});
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 89bb313..142e25c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -54,6 +54,7 @@ import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -132,6 +133,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
@@ -309,6 +311,49 @@ public class ProcessGroupResource extends
ApplicationResource {
return generateOkResponse(entity).build();
}
+ /**
+ * Retrieves the specified group as a versioned flow snapshot for download.
+ *
+ * @param groupId The id of the process group
+ * @return A processGroupEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/download")
+ @ApiOperation(
+ value = "Gets a process group for download",
+ response = String.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}")
+ }
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the
request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make
this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not
be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was
not in the appropriate state to process it. Retrying the same request later may
be successful.")
+ })
+ public Response exportProcessGroup(@ApiParam(value = "The process group
id.", required = true) @PathParam("id") final String groupId) {
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ // ensure access to process groups (nested), encapsulated
controller services and referenced parameter contexts
+ final ProcessGroupAuthorizable groupAuthorizable =
lookup.getProcessGroup(groupId);
+ authorizeProcessGroup(groupAuthorizable, authorizer, lookup,
RequestAction.READ, true,
+ false, true, false, true);
+ });
+
+ // get the versioned flow
+ final VersionedFlowSnapshot currentVersionedFlowSnapshot =
serviceFacade.getCurrentFlowSnapshotByGroupId(groupId);
+
+ // determine the name of the attachment - possible issues with spaces
in file names
+ final VersionedProcessGroup currentVersionedProcessGroup =
currentVersionedFlowSnapshot.getFlowContents();
+ final String flowName = currentVersionedProcessGroup.getName();
+ final String filename = flowName.replaceAll("\\s", "_") + ".json";
+
+ return
generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION,
String.format("attachment; filename=\"%s\"", filename)).build();
+ }
/**
* Retrieves a list of local modifications to the Process Group since it
was last synchronized with the Flow Registry
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 8aec9c9..7816f58 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -92,6 +92,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
@@ -174,6 +175,66 @@ public class VersionsResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("process-groups/{id}/download")
+ @ApiOperation(
+ value = "Gets the latest version of a Process Group for download",
+ response = String.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}")
+ }
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the
request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make
this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not
be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was
not in the appropriate state to process it. Retrying the same request later may
be successful.")
+ })
+ public Response exportFlowVersion(@ApiParam(value = "The process group
id.", required = true) @PathParam("id") final String groupId) {
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final ProcessGroupAuthorizable groupAuthorizable =
lookup.getProcessGroup(groupId);
+ // ensure access to process groups (nested), encapsulated
controller services and referenced parameter contexts
+ authorizeProcessGroup(groupAuthorizable, authorizer, lookup,
RequestAction.READ, true,
+ false, true, false, true);
+ });
+
+ // get the versioned flow
+ final VersionedFlowSnapshot versionedFlowSnapshot =
serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
+
+ final VersionedProcessGroup versionedProcessGroup =
versionedFlowSnapshot.getFlowContents();
+ final String flowName = versionedProcessGroup.getName();
+ final int flowVersion =
versionedFlowSnapshot.getSnapshotMetadata().getVersion();
+
+ // clear top-level registry data which doesn't belong in versioned
flow download
+ versionedFlowSnapshot.setFlow(null);
+ versionedFlowSnapshot.setBucket(null);
+ versionedFlowSnapshot.setSnapshotMetadata(null);
+
+ // clear nested process group registry data which doesn't belong in
versioned flow download
+ sanitizeRegistryInfo(versionedProcessGroup);
+
+ // determine the name of the attachment - possible issues with spaces
in file names
+ final String filename = flowName.replaceAll("\\s", "_") + "_" +
flowVersion + ".json";
+
+ return
generateOkResponse(versionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION,
String.format("attachment; filename=\"%s\"", filename)).build();
+ }
+
+ /**
+ * Recursively clear the registry info in the given versioned process
group and all nested versioned process groups
+ *
+ * @param versionedProcessGroup the process group to sanitize
+ */
+ private void sanitizeRegistryInfo(final VersionedProcessGroup
versionedProcessGroup) {
+ versionedProcessGroup.setVersionedFlowCoordinates(null);
+
+ for (final VersionedProcessGroup innerVersionedProcessGroup :
versionedProcessGroup.getProcessGroups()) {
+ sanitizeRegistryInfo(innerVersionedProcessGroup);
+ }
+ }
@POST
@Consumes(MediaType.APPLICATION_JSON)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 3c567eb..0b9cb7f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.web;
+import com.google.common.collect.Maps;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
@@ -33,14 +34,26 @@ import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.controller.ControllerFacade;
+import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Before;
import org.junit.Test;
@@ -49,6 +62,8 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -59,6 +74,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -78,6 +94,8 @@ public class StandardNiFiServiceFacadeTest {
private StandardNiFiServiceFacade serviceFacade;
private Authorizer authorizer;
+ private FlowController flowController;
+ private ProcessGroupDAO processGroupDAO;
@Before
public void setUp() throws Exception {
@@ -155,13 +173,13 @@ public class StandardNiFiServiceFacadeTest {
});
// flow controller
- final FlowController controller = mock(FlowController.class);
- when(controller.getResource()).thenCallRealMethod();
- when(controller.getParentAuthorizable()).thenCallRealMethod();
+ flowController = mock(FlowController.class);
+ when(flowController.getResource()).thenCallRealMethod();
+ when(flowController.getParentAuthorizable()).thenCallRealMethod();
// controller facade
final ControllerFacade controllerFacade = new ControllerFacade();
- controllerFacade.setFlowController(controller);
+ controllerFacade.setFlowController(flowController);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
@@ -277,4 +295,56 @@ public class StandardNiFiServiceFacadeTest {
});
}
+ @Test
+ public void testGetCurrentFlowSnapshotByGroupId() {
+ final String groupId = UUID.randomUUID().toString();
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
+ final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
+ serviceFacade.setProcessGroupDAO(processGroupDAO);
+
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
+
+ final FlowManager flowManager = mock(FlowManager.class);
+ final ExtensionManager extensionManager = mock(ExtensionManager.class);
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+
when(flowController.getExtensionManager()).thenReturn(extensionManager);
+
+ final ControllerServiceProvider controllerServiceProvider =
mock(ControllerServiceProvider.class);
+
when(flowController.getControllerServiceProvider()).thenReturn(controllerServiceProvider);
+
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+
+ // use spy to mock the make() method for generating a new flow mapper
to make this testable
+ final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
+ final NiFiRegistryFlowMapper flowMapper =
mock(NiFiRegistryFlowMapper.class);
+
when(serviceFacadeSpy.makeNiFiRegistryFlowMapper(extensionManager)).thenReturn(flowMapper);
+
+ final InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
mock(InstantiatedVersionedProcessGroup.class);
+ when(flowMapper.mapNonVersionedProcessGroup(processGroup,
controllerServiceProvider)).thenReturn(nonVersionedProcessGroup);
+
+ final String parameterName = "foo";
+ final VersionedParameterContext versionedParameterContext =
mock(VersionedParameterContext.class);
+ when(versionedParameterContext.getName()).thenReturn(parameterName);
+ final Map<String, VersionedParameterContext> parameterContexts =
Maps.newHashMap();
+ parameterContexts.put(parameterName, versionedParameterContext);
+ when(flowMapper.mapParameterContexts(processGroup,
true)).thenReturn(parameterContexts);
+
+ final ExternalControllerServiceReference
externalControllerServiceReference =
mock(ExternalControllerServiceReference.class);
+ final Map<String, ExternalControllerServiceReference>
externalControllerServiceReferences = Maps.newHashMap();
+ externalControllerServiceReferences.put("test",
externalControllerServiceReference);
+
when(nonVersionedProcessGroup.getExternalControllerServiceReferences()).thenReturn(externalControllerServiceReferences);
+
+ final VersionedFlowSnapshot versionedFlowSnapshot =
serviceFacadeSpy.getCurrentFlowSnapshotByGroupId(groupId);
+
+ assertEquals(nonVersionedProcessGroup,
versionedFlowSnapshot.getFlowContents());
+ assertEquals(1, versionedFlowSnapshot.getParameterContexts().size());
+ assertEquals(versionedParameterContext,
versionedFlowSnapshot.getParameterContexts().get(parameterName));
+ assertEquals(externalControllerServiceReferences,
versionedFlowSnapshot.getExternalControllerServices());
+ assertEquals(RestBasedFlowRegistry.FLOW_ENCODING_VERSION,
versionedFlowSnapshot.getFlowEncodingVersion());
+ assertNull(versionedFlowSnapshot.getFlow());
+ assertNull(versionedFlowSnapshot.getBucket());
+ assertNull(versionedFlowSnapshot.getSnapshotMetadata());
+ }
+
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java
new file mode 100644
index 0000000..f0fc27c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestProcessGroupResource {
+
+ @Test
+ public void testExportProcessGroup() {
+ final String groupId = UUID.randomUUID().toString();
+ final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
+ final VersionedFlowSnapshot versionedFlowSnapshot =
mock(VersionedFlowSnapshot.class);
+
+
when(serviceFacade.getCurrentFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
+
+ final String flowName = "flowname";
+ final VersionedProcessGroup versionedProcessGroup =
mock(VersionedProcessGroup.class);
+
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);
+ when(versionedProcessGroup.getName()).thenReturn(flowName);
+
+ final ProcessGroupResource resource =
getProcessGroupResource(serviceFacade);
+
+ final Response response = resource.exportProcessGroup(groupId);
+
+ final VersionedFlowSnapshot resultEntity =
(VersionedFlowSnapshot)response.getEntity();
+
+ assertEquals(200, response.getStatus());
+ assertEquals(versionedFlowSnapshot, resultEntity);
+ }
+
+ private ProcessGroupResource getProcessGroupResource(final
NiFiServiceFacade serviceFacade) {
+ final ProcessGroupResource resource = new ProcessGroupResource();
+ resource.setServiceFacade(serviceFacade);
+ return resource;
+ }
+
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
new file mode 100644
index 0000000..3958c9e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import com.google.common.collect.Sets;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestVersionsResource {
+
+ @Test
+ public void testExportFlowVersion() {
+ final String groupId = UUID.randomUUID().toString();
+ final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
+ final VersionedFlowSnapshot versionedFlowSnapshot =
mock(VersionedFlowSnapshot.class);
+
+
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
+
+ final String flowName = "flowname";
+ final int flowVersion = 1;
+ final VersionedProcessGroup versionedProcessGroup =
mock(VersionedProcessGroup.class);
+ final VersionedFlowSnapshotMetadata snapshotMetadata =
mock(VersionedFlowSnapshotMetadata.class);
+
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);
+ when(versionedProcessGroup.getName()).thenReturn(flowName);
+
when(versionedFlowSnapshot.getSnapshotMetadata()).thenReturn(snapshotMetadata);
+ when(snapshotMetadata.getVersion()).thenReturn(flowVersion);
+
+ final VersionedProcessGroup innerVersionedProcessGroup =
mock(VersionedProcessGroup.class);
+ final VersionedProcessGroup innerInnerVersionedProcessGroup =
mock(VersionedProcessGroup.class);
+
when(versionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerVersionedProcessGroup));
+
when(innerVersionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerInnerVersionedProcessGroup));
+
+ final VersionsResource resource = getVersionsResource(serviceFacade);
+
+ final Response response = resource.exportFlowVersion(groupId);
+
+ final VersionedFlowSnapshot resultEntity =
(VersionedFlowSnapshot)response.getEntity();
+
+ assertEquals(200, response.getStatus());
+ assertEquals(versionedFlowSnapshot, resultEntity);
+
+ verify(versionedFlowSnapshot).setFlow(null);
+ verify(versionedFlowSnapshot).setBucket(null);
+ verify(versionedFlowSnapshot).setSnapshotMetadata(null);
+ verify(versionedProcessGroup).setVersionedFlowCoordinates(null);
+ verify(innerVersionedProcessGroup).setVersionedFlowCoordinates(null);
+
verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null);
+ }
+
+ private VersionsResource getVersionsResource(final NiFiServiceFacade
serviceFacade) {
+ final VersionsResource resource = new VersionsResource();
+ resource.setServiceFacade(serviceFacade);
+ return resource;
+ }
+
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/otp/OtpAuthenticationFilter.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/otp/OtpAuthenticationFilter.java
index 34883fc..98d8e68 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/otp/OtpAuthenticationFilter.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/otp/OtpAuthenticationFilter.java
@@ -36,6 +36,8 @@ public class OtpAuthenticationFilter extends
NiFiAuthenticationFilter {
Pattern.compile("/flowfile-queues/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content");
private static final Pattern TEMPLATE_DOWNLOAD_PATTERN =
Pattern.compile("/templates/[a-f0-9\\-]{36}/download");
+ private static final Pattern FLOW_DOWNLOAD_PATTERN =
+ Pattern.compile("/process-groups/[a-f0-9\\-]{36}/download");
protected static final String ACCESS_TOKEN = "access_token";
@@ -69,7 +71,8 @@ public class OtpAuthenticationFilter extends
NiFiAuthenticationFilter {
}
private boolean isDownloadRequest(final String pathInfo) {
- return PROVENANCE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() ||
QUEUE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() ||
TEMPLATE_DOWNLOAD_PATTERN.matcher(pathInfo).matches();
+ return PROVENANCE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() ||
QUEUE_DOWNLOAD_PATTERN.matcher(pathInfo).matches()
+ || TEMPLATE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() ||
FLOW_DOWNLOAD_PATTERN.matcher(pathInfo).matches();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index ec3e254..e71fde4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -150,7 +150,8 @@
urls: {
api: '../nifi-api',
controller: '../nifi-api/controller',
- parameterContexts: '../nifi-api/parameter-contexts'
+ parameterContexts: '../nifi-api/parameter-contexts',
+ downloadToken: '../nifi-api/access/download-token'
}
};
@@ -1378,6 +1379,45 @@
},
/**
+ * Downloads the current flow
+ */
+ downloadFlow: function (selection) {
+ var processGroupId = null;
+
+ if (selection.empty()) {
+ processGroupId = nfCanvasUtils.getGroupId();
+ } else if (selection.size() === 1) {
+ var selectionData = selection.datum();
+ if (nfCanvasUtils.isProcessGroup(selection)) {
+ processGroupId = selectionData.id;
+ }
+ }
+
+ if (processGroupId !== null) {
+
nfCommon.getAccessToken(config.urls.downloadToken).done(function
(downloadToken) {
+ var parameters = {};
+
+ // conditionally include the download token
+ if (!nfCommon.isBlank(downloadToken)) {
+ parameters['access_token'] = downloadToken;
+ }
+
+ // open the url
+ var uri = '../nifi-api/process-groups/' +
encodeURIComponent(processGroupId) + '/download';
+ if (!$.isEmptyObject(parameters)) {
+ uri += ('?' + $.param(parameters));
+ }
+ window.open(uri);
+ }).fail(function () {
+ nfDialog.showOkDialog({
+ headerText: 'Download Flow',
+ dialogContent: 'Unable to generate access token for
downloading content.'
+ });
+ });
+ }
+ },
+
+ /**
* Disconnects a Process Group from flow versioning.
*/
stopVersionControl: function (selection) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index 78e3e94..2699f6d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -430,6 +430,18 @@
};
/**
+ * Returns whether the process group supports downloading the current flow.
+ *
+ * @param selection
+ * @returns {boolean}
+ */
+ var supportsDownloadFlow = function (selection) {
+ // download is allowed when either nothing is selected or a single
readable process group is selected
+ return (selection.empty() && nfCanvasUtils.canReadCurrentGroup()) ||
+ (selection.size() === 1 &&
nfCanvasUtils.isProcessGroup(selection) && nfCanvasUtils.canRead(selection));
+ };
+
+ /**
* Determines whether the current selection supports flow versioning.
*
* @param selection
@@ -494,36 +506,11 @@
* @returns {boolean}
*/
var supportsCommitFlowVersion = function (selection) {
- // ensure this selection supports flow versioning above
- if (supportsFlowVersioning(selection) === false) {
- return false;
- }
-
- var versionControlInformation;
- if (selection.empty()) {
- // check bread crumbs for version control information in the
current group
- var breadcrumbEntities =
nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
- if (breadcrumbEntities.length > 0) {
- var breadcrumbEntity =
breadcrumbEntities[breadcrumbEntities.length - 1];
- if (breadcrumbEntity.permissions.canRead) {
- versionControlInformation =
breadcrumbEntity.breadcrumb.versionControlInformation;
- } else {
- return false;
- }
- } else {
- return false;
- }
- } else {
- var processGroupData = selection.datum();
- versionControlInformation =
processGroupData.component.versionControlInformation;
- }
-
- if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
- return false;
- }
+ var versionControlInformation =
getFlowVersionControlInformation(selection);
// check the selection for version control information
- return versionControlInformation.state === 'LOCALLY_MODIFIED';
+ return versionControlInformation !== null &&
+ versionControlInformation.state === 'LOCALLY_MODIFIED';
};
/**
@@ -533,36 +520,11 @@
* @returns {boolean}
*/
var supportsForceCommitFlowVersion = function (selection) {
- // ensure this selection supports flow versioning above
- if (supportsFlowVersioning(selection) === false) {
- return false;
- }
-
- var versionControlInformation;
- if (selection.empty()) {
- // check bread crumbs for version control information in the
current group
- var breadcrumbEntities =
nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
- if (breadcrumbEntities.length > 0) {
- var breadcrumbEntity =
breadcrumbEntities[breadcrumbEntities.length - 1];
- if (breadcrumbEntity.permissions.canRead) {
- versionControlInformation =
breadcrumbEntity.breadcrumb.versionControlInformation;
- } else {
- return false;
- }
- } else {
- return false;
- }
- } else {
- var processGroupData = selection.datum();
- versionControlInformation =
processGroupData.component.versionControlInformation;
- }
-
- if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
- return false;
- }
+ var versionControlInformation =
getFlowVersionControlInformation(selection);
// check the selection for version control information
- return versionControlInformation.state ===
'LOCALLY_MODIFIED_AND_STALE';
+ return versionControlInformation !== null &&
+ versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
};
@@ -573,36 +535,12 @@
* @returns {boolean}
*/
var hasLocalChanges = function (selection) {
- // ensure this selection supports flow versioning above
- if (supportsFlowVersioning(selection) === false) {
- return false;
- }
-
- var versionControlInformation;
- if (selection.empty()) {
- // check bread crumbs for version control information in the
current group
- var breadcrumbEntities =
nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
- if (breadcrumbEntities.length > 0) {
- var breadcrumbEntity =
breadcrumbEntities[breadcrumbEntities.length - 1];
- if (breadcrumbEntity.permissions.canRead) {
- versionControlInformation =
breadcrumbEntity.breadcrumb.versionControlInformation;
- } else {
- return false;
- }
- } else {
- return false;
- }
- } else {
- var processGroupData = selection.datum();
- versionControlInformation =
processGroupData.component.versionControlInformation;
- }
-
- if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
- return false;
- }
+ var versionControlInformation =
getFlowVersionControlInformation(selection);
// check the selection for version control information
- return versionControlInformation.state === 'LOCALLY_MODIFIED' ||
versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
+ return versionControlInformation !== null &&
+ (versionControlInformation.state === 'LOCALLY_MODIFIED' ||
+ versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE');
};
/**
@@ -612,36 +550,10 @@
* @returns {boolean}
*/
var supportsChangeFlowVersion = function (selection) {
- // ensure this selection supports flow versioning above
- if (supportsFlowVersioning(selection) === false) {
- return false;
- }
-
- var versionControlInformation;
- if (selection.empty()) {
- // check bread crumbs for version control information in the
current group
- var breadcrumbEntities =
nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
- if (breadcrumbEntities.length > 0) {
- var breadcrumbEntity =
breadcrumbEntities[breadcrumbEntities.length - 1];
- if (breadcrumbEntity.permissions.canRead) {
- versionControlInformation =
breadcrumbEntity.breadcrumb.versionControlInformation;
- } else {
- return false;
- }
- } else {
- return false;
- }
- } else {
- var processGroupData = selection.datum();
- versionControlInformation =
processGroupData.component.versionControlInformation;
- }
-
- if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
- return false;
- }
+ var versionControlInformation =
getFlowVersionControlInformation(selection);
- // check the selection for version control information
- return versionControlInformation.state !== 'LOCALLY_MODIFIED' &&
+ return versionControlInformation !== null &&
+ versionControlInformation.state !== 'LOCALLY_MODIFIED' &&
versionControlInformation.state !== 'LOCALLY_MODIFIED_AND_STALE' &&
versionControlInformation.state !== 'SYNC_FAILURE';
};
@@ -650,32 +562,51 @@
* Determines whether the current selection supports stopping flow
versioning.
*
* @param selection
+ * @returns {boolean}
*/
var supportsStopFlowVersioning = function (selection) {
- // ensure this selection supports flow versioning above
+ var versionControlInformation =
getFlowVersionControlInformation(selection);
+
+ return versionControlInformation !== null;
+ };
+
+ /**
+ * Convenience function to perform all flow versioning pre-checks and
retrieve
+ * valid version information.
+ *
+ * @param selection
+ */
+ var getFlowVersionControlInformation = function (selection) {
+ // ensure this selection supports flow versioning
if (supportsFlowVersioning(selection) === false) {
- return false;
+ return null;
}
+ var versionControlInformation;
if (selection.empty()) {
// check bread crumbs for version control information in the
current group
var breadcrumbEntities =
nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity =
breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
- return
nfCommon.isDefinedAndNotNull(breadcrumbEntity.breadcrumb.versionControlInformation);
+ versionControlInformation =
breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
- return false;
+ return null;
}
} else {
- return false;
+ return null;
}
+ } else {
+ var processGroupData = selection.datum();
+ versionControlInformation =
processGroupData.component.versionControlInformation;
}
- // check the selection for version control information
- var processGroupData = selection.datum();
- return
nfCommon.isDefinedAndNotNull(processGroupData.component.versionControlInformation);
- };
+ if (nfCommon.isDefinedAndNotNull(versionControlInformation)) {
+ return versionControlInformation;;
+ }
+
+ return null;
+ }
/**
* Determines whether the current selection could have provenance.
@@ -910,6 +841,8 @@
{id: 'move-into-parent-menu-item', condition: canMoveToParent,
menuItem: {clazz: 'fa fa-arrows', text: 'Move to parent group', action:
'moveIntoParent'}},
{id: 'group-menu-item', condition: canGroup, menuItem: {clazz: 'icon
icon-group', text: 'Group', action: 'group'}},
{separator: true},
+ {id: 'download-menu-item', condition: supportsDownloadFlow, menuItem:
{clazz: 'fa', text: 'Download flow', action: 'downloadFlow'}},
+ {separator: true},
{id: 'upload-template-menu-item', condition: canUploadTemplate,
menuItem: {clazz: 'icon icon-template-import', text: 'Upload template', action:
'uploadTemplate'}},
{id: 'template-menu-item', condition: canCreateTemplate, menuItem:
{clazz: 'icon icon-template-save', text: 'Create template', action:
'template'}},
{separator: true},