http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java new file mode 100644 index 0000000..828b970 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class StandardFlowRegistryClient implements FlowRegistryClient { + private ConcurrentMap<String, FlowRegistry> registryById = new ConcurrentHashMap<>(); + + @Override + public FlowRegistry getFlowRegistry(String registryId) { + return registryById.get(registryId); + } + + @Override + public Set<String> getRegistryIdentifiers() { + return registryById.keySet(); + } + + @Override + public void addFlowRegistry(final FlowRegistry registry) { + final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry); + if (existing != null) { + throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier()); + } + } + + @Override + public FlowRegistry addFlowRegistry(final String registryId, final String registryName, final String registryUrl, final String description) { + final URI uri = URI.create(registryUrl); + final String uriScheme = uri.getScheme(); + + final FlowRegistry registry; + if (uriScheme.equalsIgnoreCase("file")) { + try { + registry = new FileBasedFlowRegistry(registryId, registryUrl); + } catch (IOException e) { + throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e); + } + + registry.setName(registryName); + registry.setDescription(description); + } else { + throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl + + " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme); + } + + addFlowRegistry(registry); + return registry; + } + + @Override + public FlowRegistry removeFlowRegistry(final String registryId) { + return registryById.remove(registryId); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index e3edc30..a75d112 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -28,10 +28,12 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.ClassUtils; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; @@ -46,6 +48,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.flow.BatchSize; import org.apache.nifi.registry.flow.Bundle; import org.apache.nifi.registry.flow.ComponentType; @@ -56,13 +59,14 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.PortType; import org.apache.nifi.registry.flow.Position; -import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; @@ -78,9 +82,91 @@ public class NiFiRegistryFlowMapper { public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) { versionedComponentIds.clear(); - return mapGroup(group, registryClient, true); + final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true); + + // TODO: Test that this works properly + populateReferencedAncestorServices(group, mapped); + + // TODO: Test that this works properly + populateReferencedAncestorVariables(group, mapped); + + return mapped; + } + + private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { + final Set<ControllerServiceNode> ancestorControllerServices = group.getControllerServices(true); + ancestorControllerServices.remove(group.getControllerServices(false)); + final Map<String, ControllerServiceNode> ancestorServicesById = ancestorControllerServices.stream() + .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity())); + + final Set<ControllerServiceNode> referenced = new HashSet<>(); + + for (final ProcessorNode processor : group.findAllProcessors()) { + findReferencedServices(processor, ancestorServicesById, referenced); + } + + for (final ControllerServiceNode service : group.findAllControllerServices()) { + findReferencedServices(service, ancestorServicesById, referenced); + } + + final Set<VersionedControllerService> versionedServices = referenced.stream().map(this::mapControllerService) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + versionedGroup.getControllerServices().addAll(versionedServices); + } + + private Set<ControllerServiceNode> findReferencedServices(final ConfiguredComponent component, final Map<String, ControllerServiceNode> ancestorServicesById, + final Set<ControllerServiceNode> referenced) { + + for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.getControllerServiceDefinition() != null) { + final String serviceId = entry.getValue(); + final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId); + if (serviceNode != null) { + referenced.add(serviceNode); + referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced)); + } + } + } + + return referenced; + } + + private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { + final Set<String> ancestorVariableNames = new HashSet<>(); + populateVariableNames(group.getParent(), ancestorVariableNames); + + final Map<String, String> implicitlyDefinedVariables = new HashMap<>(); + for (final String variableName : ancestorVariableNames) { + final boolean isReferenced = !group.getComponentsAffectedByVariable(variableName).isEmpty(); + if (isReferenced) { + final String value = group.getVariableRegistry().getVariableValue(variableName); + implicitlyDefinedVariables.put(variableName, value); + } + } + + if (!implicitlyDefinedVariables.isEmpty()) { + // Merge the implicit variables with the explicitly defined variables for the Process Group + // and set those as the Versioned Group's variables. + implicitlyDefinedVariables.putAll(versionedGroup.getVariables()); + versionedGroup.setVariables(implicitlyDefinedVariables); + } + } + + private void populateVariableNames(final ProcessGroup group, final Set<String> variableNames) { + if (group == null) { + return; + } + + group.getVariableRegistry().getVariableMap().keySet().stream() + .map(VariableDescriptor::getName) + .forEach(variableNames::add); + + populateVariableNames(group.getParent(), variableNames); } + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) { final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); @@ -95,7 +181,7 @@ public class NiFiRegistryFlowMapper { if (!topLevel) { final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); if (versionControlInfo != null) { - final RemoteFlowCoordinates coordinates = new RemoteFlowCoordinates(); + final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates(); final String registryId = versionControlInfo.getRegistryIdentifier(); final FlowRegistry registry = registryClient.getFlowRegistry(registryId); if (registry == null) { @@ -237,6 +323,7 @@ public class NiFiRegistryFlowMapper { private Map<String, String> mapProperties(final ConfiguredComponent component) { final Map<String, String> mapped = new HashMap<>(); component.getProperties().keySet().stream() + .filter(property -> !property.isSensitive()) .forEach(property -> { String value = component.getProperty(property); if (value == null) { @@ -312,7 +399,7 @@ public class NiFiRegistryFlowMapper { versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); versionedPort.setName(port.getName()); versionedPort.setPosition(mapPosition(port.getPosition())); - versionedPort.setType(PortType.valueOf(port.getComponentType())); + versionedPort.setType(PortType.valueOf(port.getConnectableType().name())); return versionedPort; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java index eda045a..807691f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java @@ -19,6 +19,7 @@ package org.apache.nifi.util; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.web.api.dto.BundleDTO; import java.util.List; @@ -28,7 +29,6 @@ import java.util.stream.Collectors; * Utility class for Bundles. */ public final class BundleUtils { - private static BundleCoordinate findBundleForType(final String type, final BundleCoordinate desiredCoordinate) { final List<Bundle> bundles = ExtensionManager.getBundles(type); if (bundles.isEmpty()) { @@ -140,4 +140,50 @@ public final class BundleUtils { } } + + /** + * Discovers the compatible bundle details for the components in the specified Versioned Process Group and updates the Versioned Process Group + * to reflect the appropriate bundles. + * + * @param versionedGroup the versioned group + */ + public static void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) { + if (versionedGroup.getProcessors() != null) { + versionedGroup.getProcessors().forEach(processor -> { + final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle())); + + final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle(); + bundle.setArtifact(coordinate.getId()); + bundle.setGroup(coordinate.getGroup()); + bundle.setVersion(coordinate.getVersion()); + processor.setBundle(bundle); + }); + } + + if (versionedGroup.getControllerServices() != null) { + versionedGroup.getControllerServices().forEach(controllerService -> { + final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle())); + + final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle(); + bundle.setArtifact(coordinate.getId()); + bundle.setGroup(coordinate.getGroup()); + bundle.setVersion(coordinate.getVersion()); + controllerService.setBundle(bundle); + }); + } + + if (versionedGroup.getProcessGroups() != null) { + versionedGroup.getProcessGroups().forEach(processGroup -> { + discoverCompatibleBundles(processGroup); + }); + } + } + + public static BundleDTO createBundleDto(final org.apache.nifi.registry.flow.Bundle bundle) { + final BundleDTO dto = new BundleDTO(); + dto.setArtifact(bundle.getArtifact()); + dto.setGroup(dto.getGroup()); + dto.setVersion(dto.getVersion()); + return dto; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 8186c8b..8954f39 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -26,6 +26,8 @@ </xs:sequence> </xs:choice> + <xs:element name="registries" type="RegistriesType" minOccurs="0" maxOccurs="1" /> + <!-- Groupings of Processors/Ports --> <xs:element name="rootGroup" type="RootProcessGroupType" /> @@ -38,6 +40,21 @@ <xs:attribute name="encoding-version" type="xs:string"/> </xs:complexType> + <xs:complexType name="RegistriesType"> + <xs:sequence> + <xs:element name="flowRegistry" type="FlowRegistryType" minOccurs="0" maxOccurs="unbounded" /> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="FlowRegistryType"> + <xs:sequence> + <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="name" type="NonEmptyStringType" /> + <xs:element name="url" type="NonEmptyStringType" /> + <xs:element name="description" type="NonEmptyStringType" /> + </xs:sequence> + </xs:complexType> + <!-- the processor "id" is a key that should be valid within each flowController--> <xs:complexType name="ProcessorType"> <xs:sequence> http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index 6a3ec8b..fc42c62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -36,9 +36,7 @@ </bean> <!-- flow registry --> - <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.FileBasedFlowRegistryClient"> - <constructor-arg index="0" type="java.io.File" value="../flowRegistry" /> - </bean> + <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient" /> <!-- flow controller --> <bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean"> http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 18dc51b..db4ac59 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -654,7 +654,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) { + public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) { } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java index ca3e95f..31f1fbe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java @@ -32,6 +32,8 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.connectable.Position; @@ -208,9 +210,11 @@ public class FingerprintFactoryTest { when(component.getProxyPort()).thenReturn(null); when(component.getProxyUser()).thenReturn(null); when(component.getProxyPassword()).thenReturn(null); + when(component.getVersionedComponentId()).thenReturn(Optional.empty()); // Assert fingerprints with expected one. final String expected = "id" + + "NO_VALUE" + "http://node1:8080/nifi, http://node2:8080/nifi" + "eth0" + "10 sec" + @@ -245,9 +249,11 @@ public class FingerprintFactoryTest { when(component.getProxyPort()).thenReturn(3128); when(component.getProxyUser()).thenReturn("proxy-user"); when(component.getProxyPassword()).thenReturn("proxy-pass"); + when(component.getVersionedComponentId()).thenReturn(Optional.empty()); // Assert fingerprints with expected one. final String expected = "id" + + "NO_VALUE" + "http://node1:8080/nifi, http://node2:8080/nifi" + "NO_VALUE" + "10 sec" + @@ -273,6 +279,7 @@ public class FingerprintFactoryTest { when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi"); when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + when(groupComponent.getVersionedComponentId()).thenReturn(Optional.empty()); final RemoteGroupPort portComponent = mock(RemoteGroupPort.class); when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent)); @@ -288,6 +295,7 @@ public class FingerprintFactoryTest { when(portComponent.getBatchDuration()).thenReturn("10sec"); // Serializer doesn't serialize if a port doesn't have any connection. when(portComponent.hasIncomingConnection()).thenReturn(true); + when(portComponent.getVersionedComponentId()).thenReturn(Optional.empty()); // Assert fingerprints with expected one. final String expected = "portId" + http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java new file mode 100644 index 0000000..b1da06a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.util.Tuple; +import org.apache.nifi.web.NiFiCoreException; +import org.apache.nifi.web.api.dto.BundleDTO; + +public class FlowRegistryUtils { + + public static boolean containsRestrictedComponent(final VersionedProcessGroup group) { + final Set<Tuple<String, BundleCoordinate>> componentTypes = new HashSet<>(); + populateComponentTypes(group, componentTypes); + + for (final Tuple<String, BundleCoordinate> tuple : componentTypes) { + final ConfigurableComponent component = ExtensionManager.getTempComponent(tuple.getKey(), tuple.getValue()); + if (component == null) { + throw new NiFiCoreException("Could not create an instance of component " + tuple.getKey() + " using bundle coordinates " + tuple.getValue()); + } + + final boolean isRestricted = component.getClass().isAnnotationPresent(Restricted.class); + if (isRestricted) { + return true; + } + } + + return false; + } + + private static void populateComponentTypes(final VersionedProcessGroup group, final Set<Tuple<String, BundleCoordinate>> componentTypes) { + group.getProcessors().stream() + .map(versionedProc -> new Tuple<>(versionedProc.getType(), createBundleCoordinate(versionedProc.getBundle()))) + .forEach(componentTypes::add); + + group.getControllerServices().stream() + .map(versionedSvc -> new Tuple<>(versionedSvc.getType(), createBundleCoordinate(versionedSvc.getBundle()))) + .forEach(componentTypes::add); + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + populateComponentTypes(childGroup, componentTypes); + } + } + + + public static BundleCoordinate createBundleCoordinate(final Bundle bundle) { + return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); + } + + public static BundleDTO createBundleDto(final Bundle bundle) { + return new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index a68ad0c..d851677 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -407,6 +407,13 @@ public interface NiFiServiceFacade { void verifyComponentTypes(FlowSnippetDTO snippet); /** + * Verifies the types of components in a versioned process group + * + * @param versionedGroup the proposed process group + */ + void verifyComponentTypes(VersionedProcessGroup versionedGroup); + + /** * Creates a new Template based off the specified snippet. * * @param name name @@ -1385,10 +1392,11 @@ public interface NiFiServiceFacade { * @param versionControlInfo the Version Control information * @param snapshot the new snapshot * @param componentIdSeed the seed to use for generating new component ID's + * @param updateSettings whether or not the process group's name and position should be updated * @return the Process Group */ - ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified); + ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, + boolean verifyNotModified, boolean updateSettings); // ---------------------------------------- // Component state methods http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 0105bf1..2b5b5c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,32 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -58,6 +83,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -85,10 +111,9 @@ import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; import org.apache.nifi.registry.ComponentVariableRegistry; -import org.apache.nifi.registry.flow.ConnectableComponent; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.UnknownResourceException; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; @@ -103,20 +128,19 @@ import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; -import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.Tuple; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; import org.apache.nifi.web.api.dto.AffectedComponentDTO; @@ -239,6 +263,7 @@ import org.apache.nifi.web.dao.LabelDAO; import org.apache.nifi.web.dao.PortDAO; import org.apache.nifi.web.dao.ProcessGroupDAO; import org.apache.nifi.web.dao.ProcessorDAO; +import org.apache.nifi.web.dao.RegistryDAO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.ReportingTaskDAO; import org.apache.nifi.web.dao.SnippetDAO; @@ -257,30 +282,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -312,6 +314,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private UserDAO userDAO; private UserGroupDAO userGroupDAO; private AccessPolicyDAO accessPolicyDAO; + private RegistryDAO registryDAO; private ClusterCoordinator clusterCoordinator; private HeartbeatMonitor heartbeatMonitor; private LeaderElectionManager leaderElectionManager; @@ -331,8 +334,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private AuthorizableLookup authorizableLookup; - private Map<String, Tuple<Revision, RegistryDTO>> registryCache = new HashMap<>(); - // ----------------------------------------- // Synchronization methods // ----------------------------------------- @@ -1849,6 +1850,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) { + controllerFacade.verifyComponentTypes(versionedGroup); + } + + @Override public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) { // get the specified snippet final Snippet snippet = snippetDAO.getSnippet(snippetId); @@ -2260,44 +2266,101 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createControllerServiceEntity(snapshot, null, permissions, null); } - private RegistryEntity createRegistryEntity(final Revision updatedRevision, final RegistryDTO registryDTO) { - final RegistryEntity entity = new RegistryEntity(); - entity.setId(registryDTO.getId()); - entity.setPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController())); - entity.setRevision(dtoFactory.createRevisionDTO(updatedRevision)); - entity.setComponent(registryDTO); - return entity; - } @Override public RegistryEntity createRegistry(Revision revision, RegistryDTO registryDTO) { - registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO)); - return createRegistryEntity(revision, registryDTO); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // read lock on the containing group + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); + + // update revision through revision manager + final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> { + // add the component + final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO); + + // save the flow + controllerFacade.save(); + + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(registry, lastMod); + }); + + final FlowRegistry registry = revisionUpdate.getComponent(); + return createRegistryEntity(registry); } @Override - public RegistryEntity getRegistry(String registryId) { - final Tuple<Revision, RegistryDTO> registry = registryCache.get(registryId); - return createRegistry(registry.getKey(), registry.getValue()); + public RegistryEntity getRegistry(final String registryId) { + final FlowRegistry registry = registryDAO.getFlowRegistry(registryId); + return createRegistryEntity(registry); + } + + private RegistryEntity createRegistryEntity(final FlowRegistry flowRegistry) { + if (flowRegistry == null) { + return null; + } + + final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry); + final Revision revision = revisionManager.getRevision(dto.getId()); + + final RegistryEntity entity = new RegistryEntity(); + entity.setComponent(dto); + entity.setRevision(dtoFactory.createRevisionDTO(revision)); + entity.setId(dto.getId()); + + // User who created it can read/write it. + final PermissionsDTO permissions = new PermissionsDTO(); + permissions.setCanRead(true); + permissions.setCanWrite(true); + entity.setPermissions(permissions); + + return entity; } @Override public Set<RegistryEntity> getRegistries() { - return registryCache.values().stream() - .map(registry -> createRegistry(registry.getKey(), registry.getValue())) - .collect(Collectors.toSet()); + return registryDAO.getFlowRegistries().stream() + .map(this::createRegistryEntity) + .collect(Collectors.toSet()); } @Override public RegistryEntity updateRegistry(Revision revision, RegistryDTO registryDTO) { - registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO)); - return createRegistryEntity(revision, registryDTO); + final RevisionClaim revisionClaim = new StandardRevisionClaim(revision); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId()); + final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> { + registry.setDescription(registryDTO.getDescription()); + registry.setName(registryDTO.getName()); + registry.setURL(registryDTO.getUri()); + + controllerFacade.save(); + + final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); + final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); + + return new StandardRevisionUpdate<FlowRegistry>(registry, lastModification); + }); + + final FlowRegistry updatedReg = revisionUpdate.getComponent(); + return createRegistryEntity(updatedReg); } @Override - public RegistryEntity deleteRegistry(Revision revision, String registryId) { - final Tuple<Revision, RegistryDTO> registry = registryCache.remove(registryId); - return createRegistryEntity(registry.getKey(), registry.getValue()); + public RegistryEntity deleteRegistry(final Revision revision, final String registryId) { + final RevisionClaim claim = new StandardRevisionClaim(revision); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> { + final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId); + controllerFacade.save(); + return reg; + }); + + return createRegistryEntity(registry); } @Override @@ -3665,6 +3728,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }) .collect(Collectors.toCollection(HashSet::new)); + final Map<String, List<Connection>> connectionsByVersionedId = group.findAllConnections().stream() + .filter(conn -> conn.getVersionedComponentId().isPresent()) + .collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get())); + for (final FlowDifference difference : comparison.getDifferences()) { VersionedComponent component = difference.getComponentA(); if (component == null) { @@ -3674,58 +3741,47 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) { final VersionedConnection connection = (VersionedConnection) component; - final ConnectableComponent source = connection.getSource(); - final ConnectableComponent destination = connection.getDestination(); + final String versionedConnectionId = connection.getIdentifier(); + final List<Connection> instances = connectionsByVersionedId.get(versionedConnectionId); + if (instances == null) { + continue; + } - affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user)); - affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user)); + for (final Connection instance : instances) { + affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user)); + affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user)); + } } } return affectedComponents; } - private String getComponentState(final InstantiatedConnectableComponent localComponent) { - final String componentId = localComponent.getInstanceId(); - final String groupId = localComponent.getInstanceGroupId(); - switch (localComponent.getType()) { - case PROCESSOR: - return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name(); - case REMOTE_INPUT_PORT: - return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name(); - case REMOTE_OUTPUT_PORT: - return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name(); - default: - return null; - } - } - - private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) { + private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); - entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); - entity.setId(instance.getInstanceId()); + entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier()))); + entity.setId(connectable.getIdentifier()); - final Authorizable authorizable = getAuthorizable(componentTypeName, instance); + final Authorizable authorizable = getAuthorizable(connectable); final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); entity.setPermissions(permissionsDto); final AffectedComponentDTO dto = new AffectedComponentDTO(); - dto.setId(instance.getInstanceId()); - dto.setReferenceType(componentTypeName); - dto.setProcessGroupId(instance.getInstanceGroupId()); - dto.setState(componentState); + dto.setId(connectable.getIdentifier()); + dto.setReferenceType(connectable.getConnectableType().name()); + dto.setProcessGroupId(connectable.getProcessGroupIdentifier()); + dto.setState(connectable.getScheduledState().name()); entity.setComponent(dto); return entity; } - private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) { + private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); entity.setId(instance.getInstanceId()); - final String componentTypeName = instance.getType().name(); final Authorizable authorizable = getAuthorizable(componentTypeName, instance); final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); entity.setPermissions(permissionsDto); @@ -3734,12 +3790,24 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { dto.setId(instance.getInstanceId()); dto.setReferenceType(componentTypeName); dto.setProcessGroupId(instance.getInstanceGroupId()); - dto.setState(getComponentState(instance)); + dto.setState(componentState); entity.setComponent(dto); return entity; } + + private Authorizable getAuthorizable(final Connectable connectable) { + switch (connectable.getConnectableType()) { + case REMOTE_INPUT_PORT: + case REMOTE_OUTPUT_PORT: + final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier(); + return authorizableLookup.getRemoteProcessGroup(rpgId); + default: + return authorizableLookup.getLocalConnectable(connectable.getIdentifier()); + } + } + private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) { final String componentId = versionedComponent.getInstanceId(); @@ -3820,7 +3888,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException { - final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates(); + final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates(); if (remoteCoordinates != null) { final String registryUrl = remoteCoordinates.getRegistryUrl(); @@ -3868,17 +3936,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified); + return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true); } @Override - public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { + public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId); final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision, processGroupNode, - () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified), + () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); @@ -4243,27 +4311,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { this.leaderElectionManager = leaderElectionManager; } + public void setRegistryDAO(RegistryDAO registryDao) { + this.registryDAO = registryDao; + } + public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) { this.flowRegistryClient = flowRegistryClient; - - // temp code to load the registry client cache - final Set<String> registryIdentifiers = flowRegistryClient.getRegistryIdentifiers(); - if (registryIdentifiers != null) { - - for (final String registryIdentifier : registryIdentifiers) { - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryIdentifier); - - final RegistryDTO registry = new RegistryDTO(); - registry.setId(registryIdentifier); - registry.setName(flowRegistry.getName()); - registry.setUri(flowRegistry.getURL()); - registry.setDescription("Default client for storing Flow Revisions to the local disk."); - - final RegistryEntity registryEntity = new RegistryEntity(); - registryEntity.setComponent(registry); - - registryCache.put(registryIdentifier, new Tuple(new Revision(0L, null, registryIdentifier), registry)); - } - } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index ebed0ad..11c548f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,12 +16,57 @@ */ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLStreamReader; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeAccess; @@ -42,6 +87,8 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowEncodingVersion; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.flow.FlowRegistryUtils; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; @@ -64,6 +111,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; @@ -104,55 +152,12 @@ import org.slf4j.LoggerFactory; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.stream.XMLStreamReader; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Group. @@ -826,7 +831,7 @@ public class ProcessGroupResource extends ApplicationResource { } final Map<String, String> headers = new HashMap<>(); - final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap(); + final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>(); boolean continuePolling = true; while (continuePolling) { @@ -983,7 +988,7 @@ public class ProcessGroupResource extends ApplicationResource { } final Map<String, String> headers = new HashMap<>(); - final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap(); + final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>(); boolean continuePolling = true; while (continuePolling) { @@ -1513,9 +1518,10 @@ public class ProcessGroupResource extends ApplicationResource { * Adds the specified process group. * * @param httpServletRequest request - * @param groupId The group id + * @param groupId The group id * @param requestProcessGroupEntity A processGroupEntity * @return A processGroupEntity + * @throws IOException if the request indicates that the Process Group should be imported from a Flow Registry and NiFi is unable to communicate with the Flow Registry */ @POST @Consumes(MediaType.APPLICATION_JSON) @@ -1547,7 +1553,7 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The process group configuration details.", required = true - ) final ProcessGroupEntity requestProcessGroupEntity) { + ) final ProcessGroupEntity requestProcessGroupEntity) throws IOException { if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Process group details must be specified."); @@ -1574,6 +1580,28 @@ public class ProcessGroupResource extends ApplicationResource { } requestProcessGroupEntity.getComponent().setParentGroupId(groupId); + // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. + // Step 2: Retrieve flow from Flow Registry + // Step 3: Resolve Bundle info + // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added. + // Step 5: If any of the components is a Restricted Component, then we must authorize the user + // for write access to the RestrictedComponents resource + // Step 6: Replicate the request or call serviceFacade.updateProcessGroup + + final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); + if (versionControlInfo != null) { + // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. + // Step 2: Retrieve flow from Flow Registry + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo); + + // Step 3: Resolve Bundle info + BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); + + // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added. + requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot); + } + + // Step 6: Replicate the request or call serviceFacade.updateProcessGroup if (isReplicateRequest()) { return replicate(HttpMethod.POST, requestProcessGroupEntity); } @@ -1584,8 +1612,23 @@ public class ProcessGroupResource extends ApplicationResource { lookup -> { final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + // Step 5: If any of the components is a Restricted Component, then we must authorize the user + // for write access to the RestrictedComponents resource + final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); + if (versionedFlowSnapshot != null) { + final boolean containsRestrictedComponent = FlowRegistryUtils.containsRestrictedComponent(versionedFlowSnapshot.getFlowContents()); + if (containsRestrictedComponent) { + lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + } + } + }, + () -> { + final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); + if (versionedFlowSnapshot != null) { + serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents()); + } }, - null, processGroupGroupEntity -> { // set the processor id as appropriate processGroupGroupEntity.getComponent().setId(generateUuid()); @@ -1593,6 +1636,16 @@ public class ProcessGroupResource extends ApplicationResource { // create the process group contents final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId()); final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent()); + + final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); + if (flowSnapshot != null) { + final RevisionDTO revisionDto = entity.getRevision(); + final String newGroupId = entity.getComponent().getId(); + final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); + serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, + versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false); + } + populateRemainingProcessGroupEntityContent(entity); // generate a 201 created response http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 9fbd5e8..27216a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,12 +17,39 @@ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.Authorizer; @@ -30,12 +57,11 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.registry.flow.Bundle; import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; @@ -48,7 +74,6 @@ import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest; import org.apache.nifi.web.api.concurrent.RequestManager; import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest; import org.apache.nifi.web.api.dto.AffectedComponentDTO; -import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; @@ -71,36 +96,12 @@ import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; @Path("/versions") @Api(value = "/versions", description = "Endpoint for managing version control for a flow") @@ -163,7 +164,7 @@ public class VersionsResource extends ApplicationResource { @POST - @Consumes(MediaType.APPLICATION_JSON) + @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) @Path("start-requests") @ApiOperation( @@ -402,10 +403,10 @@ public class VersionsResource extends ApplicationResource { final NodeResponse clusterResponse; try { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse(); + clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); } else { clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse(); + getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); } } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); @@ -466,10 +467,10 @@ public class VersionsResource extends ApplicationResource { final NodeResponse clusterResponse; try { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse(); + clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); } else { clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse(); + getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); } } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); @@ -942,7 +943,7 @@ public class VersionsResource extends ApplicationResource { // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. - discoverCompatibleBundles(flowSnapshot.getFlowContents()); + BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); // Step 1: Determine which components will be affected by updating the version final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); @@ -956,6 +957,12 @@ public class VersionsResource extends ApplicationResource { lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component affected. authorizeAffectedComponents(lookup, affectedComponents); + + final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents(); + final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents); + if (containsRestrictedComponents) { + lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + } }, () -> { // Step 3: Verify that all components in the snapshot exist on all nodes @@ -1070,7 +1077,7 @@ public class VersionsResource extends ApplicationResource { // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. - discoverCompatibleBundles(flowSnapshot.getFlowContents()); + BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); // Step 1: Determine which components will be affected by updating the version final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); @@ -1084,6 +1091,12 @@ public class VersionsResource extends ApplicationResource { lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component affected. authorizeAffectedComponents(lookup, affectedComponents); + + final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents(); + final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents); + if (containsRestrictedComponents) { + lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + } }, () -> { // Step 3: Verify that all components in the snapshot exist on all nodes @@ -1258,7 +1271,7 @@ public class VersionsResource extends ApplicationResource { final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation(); - serviceFacade.updateProcessGroup(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified); + serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); } asyncRequest.setLastUpdated(new Date()); @@ -1384,50 +1397,6 @@ public class VersionsResource extends ApplicationResource { this.dtoFactory = dtoFactory; } - private BundleDTO createBundleDto(final Bundle bundle) { - final BundleDTO dto = new BundleDTO(); - dto.setArtifact(bundle.getArtifact()); - dto.setGroup(dto.getGroup()); - dto.setVersion(dto.getVersion()); - return dto; - } - - /** - * Discovers the compatible bundle details for the components in the specified snippet. - * - * @param versionedGroup the versioned group - */ - private void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) { - if (versionedGroup.getProcessors() != null) { - versionedGroup.getProcessors().forEach(processor -> { - final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle())); - - final Bundle bundle = new Bundle(); - bundle.setArtifact(coordinate.getId()); - bundle.setGroup(coordinate.getGroup()); - bundle.setVersion(coordinate.getVersion()); - processor.setBundle(bundle); - }); - } - - if (versionedGroup.getControllerServices() != null) { - versionedGroup.getControllerServices().forEach(controllerService -> { - final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle())); - - final Bundle bundle = new Bundle(); - bundle.setArtifact(coordinate.getId()); - bundle.setGroup(coordinate.getGroup()); - bundle.setVersion(coordinate.getVersion()); - controllerService.setBundle(bundle); - }); - } - - if (versionedGroup.getProcessGroups() != null) { - versionedGroup.getProcessGroups().forEach(processGroup -> { - discoverCompatibleBundles(processGroup); - }); - } - } private static class ActiveRequest { private static final long MAX_REQUEST_LOCK_NANOS = TimeUnit.MINUTES.toNanos(1L); http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index ae3fc56..8e0f0c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -113,6 +113,7 @@ import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; @@ -3713,6 +3714,15 @@ public final class DtoFactory { return nodeDto; } + public RegistryDTO createRegistryDto(FlowRegistry registry) { + final RegistryDTO dto = new RegistryDTO(); + dto.setDescription(registry.getDescription()); + dto.setId(registry.getIdentifier()); + dto.setName(registry.getName()); + dto.setUri(registry.getURL()); + return dto; + } + /* setters */ public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 615f00b..29e5f7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -82,6 +82,7 @@ import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.ReportingTask; @@ -1643,6 +1644,9 @@ public class ControllerFacade implements Authorizable { return dto; } + public void verifyComponentTypes(VersionedProcessGroup versionedFlow) { + flowController.verifyComponentTypesInSnippet(versionedFlow); + } private ComponentSearchResultDTO search(final String searchStr, final ProcessorNode procNode) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 806979f..650d4b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -113,10 +113,11 @@ public interface ProcessGroupDAO { * @param proposedSnapshot Flow the new version of the flow * @param versionControlInformation the new Version Control Information * @param componentIdSeed the seed value to use for generating ID's for new components + * @param updateSettings whether or not to update the process group's name and position * @return the process group */ ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, - boolean verifyNotModified); + boolean verifyNotModified, boolean updateSettings); /** * Applies the given Version Control Information to the Process Group http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java new file mode 100644 index 0000000..83b5c6d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.dao; + +import java.util.Set; + +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.web.api.dto.RegistryDTO; + +public interface RegistryDAO { + + FlowRegistry createFlowRegistry(RegistryDTO registryDto); + + FlowRegistry getFlowRegistry(String registryId); + + Set<FlowRegistry> getFlowRegistries(); + + FlowRegistry removeFlowRegistry(String registryId); + +}
