NIFI-4436: Added additional endpoints; bug fixes Signed-off-by: Matt Gilman <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6aa8b5c6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6aa8b5c6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6aa8b5c6 Branch: refs/heads/master Commit: 6aa8b5c61c734ce56aad7816b40df88d8316feeb Parents: 7a0a900 Author: Mark Payne <[email protected]> Authored: Mon Oct 30 16:35:59 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:52 2018 -0500 ---------------------------------------------------------------------- .../nifi/web/api/entity/ProcessGroupEntity.java | 17 +- .../org/apache/nifi/groups/ProcessGroup.java | 3 +- .../apache/nifi/registry/flow/FlowRegistry.java | 30 ++ .../nifi/registry/flow/FlowRegistryClient.java | 6 + .../apache/nifi/controller/FlowController.java | 103 ++++ .../controller/StandardFlowSynchronizer.java | 36 +- .../serialization/StandardFlowSerializer.java | 29 +- .../nifi/fingerprint/FingerprintFactory.java | 21 + .../nifi/groups/StandardProcessGroup.java | 134 ++++-- .../registry/flow/FileBasedFlowRegistry.java | 478 +++++++++++++++++++ .../flow/FileBasedFlowRegistryClient.java | 435 ----------------- .../flow/StandardFlowRegistryClient.java | 75 +++ .../flow/mapping/NiFiRegistryFlowMapper.java | 95 +++- .../java/org/apache/nifi/util/BundleUtils.java | 48 +- .../src/main/resources/FlowConfiguration.xsd | 17 + .../src/main/resources/nifi-context.xml | 4 +- .../service/mock/MockProcessGroup.java | 2 +- .../fingerprint/FingerprintFactoryTest.java | 8 + .../nifi/registry/flow/FlowRegistryUtils.java | 74 +++ .../org/apache/nifi/web/NiFiServiceFacade.java | 12 +- .../nifi/web/StandardNiFiServiceFacade.java | 266 ++++++----- .../nifi/web/api/ProcessGroupResource.java | 173 ++++--- .../apache/nifi/web/api/VersionsResource.java | 151 +++--- .../org/apache/nifi/web/api/dto/DtoFactory.java | 10 + .../nifi/web/controller/ControllerFacade.java | 4 + .../apache/nifi/web/dao/ProcessGroupDAO.java | 3 +- .../org/apache/nifi/web/dao/RegistryDAO.java | 35 ++ .../nifi/web/dao/impl/FlowRegistryDAO.java | 66 +++ .../web/dao/impl/StandardProcessGroupDAO.java | 5 +- .../src/main/resources/nifi-web-api-context.xml | 4 + 30 files changed, 1578 insertions(+), 766 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java index 83af9d8..1e2a4b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.web.api.entity; -import io.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import javax.xml.bind.annotation.XmlRootElement; +import io.swagger.annotations.ApiModelProperty; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO. @@ -30,6 +32,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P private ProcessGroupDTO component; private ProcessGroupStatusDTO status; + private VersionedFlowSnapshot versionedFlowSnapshot; private Integer runningCount; private Integer stoppedCount; @@ -46,10 +49,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P * * @return The ProcessGroupDTO object */ + @Override public ProcessGroupDTO getComponent() { return component; } + @Override public void setComponent(ProcessGroupDTO component) { this.component = component; } @@ -180,4 +185,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P this.inactiveRemotePortCount = inactiveRemotePortCount; } + @ApiModelProperty(value = "Returns the Versioned Flow that describes the contents of the Versioned Flow to be imported", readOnly = true) + public VersionedFlowSnapshot getVersionedFlowSnapshot() { + return versionedFlowSnapshot; + } + + public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) { + this.versionedFlowSnapshot = versionedFlowSnapshot; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index d335461..16b4b5e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -783,8 +783,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>, * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will * throw an IllegalStateException + * @param updateSettings whether or not to update the process group's name and positions */ - void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty); + void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings); /** * Verifies a template with the specified name can be created. http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 962a940..4efff94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -24,6 +24,22 @@ import java.io.IOException; import java.util.Set; public interface FlowRegistry { + /** + * @return the ID of the Flow Registry + */ + String getIdentifier(); + + /** + * @return the description of the Flow Registry + */ + String getDescription(); + + /** + * Updates the Flow Registry's description + * + * @param description the description of the Flow Registry + */ + void setDescription(String description); /** * @return the URL of the Flow Registry @@ -31,11 +47,25 @@ public interface FlowRegistry { String getURL(); /** + * Updates the Flow Registry's URL + * + * @param url the URL of the Flow Registry + */ + void setURL(String url); + + /** * @return the name of the Flow Registry */ String getName(); /** + * Updates the name of the Flow Registry + * + * @param name the name of the Flow Registry + */ + void setName(String name); + + /** * Gets the buckets for the specified user. * * @param user current user http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java index 83f66dc..77c2761 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java @@ -34,4 +34,10 @@ public interface FlowRegistryClient { } Set<String> getRegistryIdentifiers(); + + void addFlowRegistry(FlowRegistry registry); + + FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description); + + FlowRegistry removeFlowRegistry(String registryId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 242ef6a..5ed5b6e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -165,6 +165,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -2128,6 +2130,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) { + final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion()); + if (!supportedBundles.contains(requiredCoordinate)) { + throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate); + } + } + private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) { if (templateContents.getProcessors() != null) { templateContents.getProcessors().forEach(processor -> { @@ -2150,6 +2159,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) { + if (versionedFlow.getProcessors() != null) { + versionedFlow.getProcessors().forEach(processor -> { + if (processor.getBundle() == null) { + throw new IllegalArgumentException("Processor bundle must be specified."); + } + + if (supportedTypes.containsKey(processor.getType())) { + verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType())); + } else { + throw new IllegalStateException("Invalid Processor Type: " + processor.getType()); + } + }); + } + + if (versionedFlow.getProcessGroups() != null) { + versionedFlow.getProcessGroups().forEach(processGroup -> { + verifyProcessorsInVersionedFlow(processGroup, supportedTypes); + }); + } + } + private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) { if (templateContents.getControllerServices() != null) { templateContents.getControllerServices().forEach(controllerService -> { @@ -2172,6 +2203,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) { + if (versionedFlow.getControllerServices() != null) { + versionedFlow.getControllerServices().forEach(controllerService -> { + if (supportedTypes.containsKey(controllerService.getType())) { + if (controllerService.getBundle() == null) { + throw new IllegalArgumentException("Controller Service bundle must be specified."); + } + + verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType())); + } else { + throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType()); + } + }); + } + + if (versionedFlow.getProcessGroups() != null) { + versionedFlow.getProcessGroups().forEach(processGroup -> { + verifyControllerServicesInVersionedFlow(processGroup, supportedTypes); + }); + } + } + public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) { final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>(); for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { @@ -2210,6 +2263,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) { + final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>(); + for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { + final String name = c.getName(); + processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet())); + } + verifyProcessorsInVersionedFlow(versionedFlow, processorClasses); + + final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>(); + for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) { + final String name = c.getName(); + controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet())); + } + verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses); + + final Set<String> prioritizerClasses = new HashSet<>(); + for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) { + prioritizerClasses.add(c.getName()); + } + + final Set<VersionedConnection> allConns = new HashSet<>(); + allConns.addAll(versionedFlow.getConnections()); + for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) { + allConns.addAll(findAllConnections(childGroup)); + } + + for (final VersionedConnection conn : allConns) { + final List<String> prioritizers = conn.getPrioritizers(); + if (prioritizers != null) { + for (final String prioritizer : prioritizers) { + if (!prioritizerClasses.contains(prioritizer)) { + throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer); + } + } + } + } + } + /** * <p> * Verifies that the given DTO is valid, according to the following: @@ -2270,6 +2361,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return conns; } + private Set<VersionedConnection> findAllConnections(final VersionedProcessGroup group) { + final Set<VersionedConnection> conns = new HashSet<>(); + for (final VersionedConnection connection : group.getConnections()) { + conns.add(connection); + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + conns.addAll(findAllConnections(childGroup)); + } + return conns; + } + // // Processor access // http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index e879e38..5a7aeec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -85,6 +85,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.RemoteGroupPort; @@ -184,7 +185,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { try { if (flowAlreadySynchronized) { existingFlow = toBytes(controller); - existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty(); + existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() + && controller.getAllReportingTasks().isEmpty() + && controller.getAllControllerServices().isEmpty() + && controller.getFlowRegistryClient().getRegistryIdentifiers().isEmpty(); } else { existingFlow = readFlowFromDisk(); if (existingFlow == null || existingFlow.length == 0) { @@ -220,10 +224,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); } + final boolean registriesPresent; + final Element registriesElement = DomUtils.getChild(rootElement, "registries"); + if (registriesElement == null) { + registriesPresent = false; + } else { + final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry"); + registriesPresent = !flowRegistryElems.isEmpty(); + } + logger.trace("Parsing process group from DOM"); final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion); - existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto); + existingFlowEmpty = taskElements.isEmpty() + && unrootedControllerServiceElements.isEmpty() + && isEmpty(rootGroupDto) + && registriesPresent; logger.debug("Existing Flow Empty = {}", existingFlowEmpty); } } @@ -318,6 +334,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the root group XML element final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + if (!flowAlreadySynchronized || existingFlowEmpty) { + final Element registriesElement = DomUtils.getChild(rootElement, "registries"); + if (registriesElement != null) { + final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry"); + for (final Element flowRegistryElement : flowRegistryElems) { + final String registryId = getString(flowRegistryElement, "id"); + final String registryName = getString(flowRegistryElement, "name"); + final String registryUrl = getString(flowRegistryElement, "url"); + final String description = getString(flowRegistryElement, "description"); + + final FlowRegistryClient client = controller.getFlowRegistryClient(); + client.addFlowRegistry(registryId, registryName, registryUrl, description); + } + } + } + // if this controller isn't initialized or its empty, add the root group, otherwise update final ProcessGroup rootGroup; if (!flowAlreadySynchronized || existingFlowEmpty) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index ecf2438..f921bc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -39,6 +39,8 @@ import org.apache.nifi.persistence.TemplateSerializer; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; @@ -74,7 +76,7 @@ import java.util.concurrent.TimeUnit; */ public class StandardFlowSerializer implements FlowSerializer { - private static final String MAX_ENCODING_VERSION = "1.2"; + private static final String MAX_ENCODING_VERSION = "1.3"; private final StringEncryptor encryptor; @@ -98,6 +100,11 @@ public class StandardFlowSerializer implements FlowSerializer { doc.appendChild(rootNode); addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); + + final Element registriesElement = doc.createElement("registries"); + rootNode.appendChild(registriesElement); + + addFlowRegistries(registriesElement, controller.getFlowRegistryClient()); addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup); // Add root-level controller services @@ -130,6 +137,26 @@ public class StandardFlowSerializer implements FlowSerializer { } } + private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) { + for (final String registryId : registryClient.getRegistryIdentifiers()) { + final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId); + + final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry"); + parentElement.appendChild(registryElement); + + addStringElement(registryElement, "id", flowRegistry.getIdentifier()); + addStringElement(registryElement, "name", flowRegistry.getName()); + addStringElement(registryElement, "url", flowRegistry.getURL()); + addStringElement(registryElement, "description", flowRegistry.getDescription()); + } + } + + private void addStringElement(final Element parentElement, final String elementName, final String value) { + final Element childElement = parentElement.getOwnerDocument().createElement(elementName); + childElement.setTextContent(value); + parentElement.appendChild(childElement); + } + private void addSize(final Element parentElement, final Size size) { final Element element = parentElement.getOwnerDocument().createElement("size"); element.setAttribute("width", String.valueOf(size.getWidth())); http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 3aa5084..e1846a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Stream; import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilder; @@ -198,6 +199,21 @@ public class FingerprintFactory { } private StringBuilder addFlowControllerFingerprint(final StringBuilder builder, final Element flowControllerElem, final FlowController controller) { + // registries + final Element registriesElement = DomUtils.getChild(flowControllerElem, "registries"); + if (registriesElement == null) { + builder.append("NO_VALUE"); + } else { + final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry"); + if (flowRegistryElems.isEmpty()) { + builder.append("NO_VALUE"); + } else { + for (final Element flowRegistryElement : flowRegistryElems) { + addFlowRegistryFingerprint(builder, flowRegistryElement); + } + } + } + // root group final Element rootGroupElem = (Element) DomUtils.getChildNodesByTagName(flowControllerElem, "rootGroup").item(0); addProcessGroupFingerprint(builder, rootGroupElem, controller); @@ -265,6 +281,11 @@ public class FingerprintFactory { return builder; } + private StringBuilder addFlowRegistryFingerprint(final StringBuilder builder, final Element flowRegistryElement) { + Stream.of("id", "name", "url", "description").forEach(elementName -> appendFirstValue(builder, DomUtils.getChildNodesByTagName(flowRegistryElement, elementName))); + return builder; + } + private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id")); http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 3b8117b..1d8652e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -72,7 +72,7 @@ import org.apache.nifi.registry.flow.Bundle; import org.apache.nifi.registry.flow.ConnectableComponent; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.UnknownResourceException; import org.apache.nifi.registry.flow.VersionControlInformation; @@ -2835,11 +2835,14 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @Override public void disconnectVersionControl() { writeLock.lock(); try { - // TODO remove version component ids from each component (until another versioned PG is encountered) this.versionControlInfo.set(null); + + // remove version component ids from each component (until another versioned PG is encountered) + applyVersionedComponentIds(this, id -> null); } finally { writeLock.unlock(); } @@ -2850,36 +2853,41 @@ public final class StandardProcessGroup implements ProcessGroup { return; } - processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier())); + applyVersionedComponentIds(processGroup, versionedComponentIds::get); + } + + private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) { + processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier())); processGroup.getConnections().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getProcessors().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getInputPorts().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getOutputPorts().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getLabels().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getFunnels().stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getControllerServices(false).stream() - .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier()))); processGroup.getRemoteProcessGroups().stream() .forEach(rpg -> { - rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier())); + rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier())); rpg.getInputPorts().stream() - .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier()))); + .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier()))); rpg.getOutputPorts().stream() - .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier()))); + .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier()))); }); processGroup.getProcessGroups().stream() - .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds)); + .filter(childGroup -> childGroup.getVersionControlInformation() != null) + .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } @@ -2931,10 +2939,10 @@ public final class StandardProcessGroup implements ProcessGroup { @Override - public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) { + public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) { writeLock.lock(); try { - verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also + verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient()); @@ -2950,15 +2958,15 @@ public final class StandardProcessGroup implements ProcessGroup { .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier()) .collect(Collectors.toSet()); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences()); - } else { - // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient - // than having to remember to enable DEBUG level logging every time a full build is done. - LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences()); + if (LOG.isInfoEnabled()) { + final String differencesByLine = flowComparison.getDifferences().stream() + .map(FlowDifference::toString) + .collect(Collectors.joining("\n")); + + LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine); } - updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false); + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } finally { @@ -2968,10 +2976,14 @@ public final class StandardProcessGroup implements ProcessGroup { private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed, - final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException { + final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException { group.setComments(proposed.getComments()); - group.setName(proposed.getName()); + + if (updateName) { + group.setName(proposed.getName()); + } + if (updatePosition && proposed.getPosition() != null) { group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); } @@ -2998,7 +3010,7 @@ public final class StandardProcessGroup implements ProcessGroup { group.setVariables(updatedVariableMap); - final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates(); + final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates(); if (remoteCoordinates != null) { final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl()); final String bucketId = remoteCoordinates.getBucketId(); @@ -3022,7 +3034,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed); LOG.info("Added {} to {}", added, this); } else { - updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true); + updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName); LOG.info("Updated {}", childGroup); } @@ -3136,14 +3148,29 @@ public final class StandardProcessGroup implements ProcessGroup { final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet()); + final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>(); for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); if (processor == null) { final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed); + + final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream() + .map(relName -> added.getRelationship(relName)) + .collect(Collectors.toSet()); + autoTerminatedRelationships.put(added, proposedAutoTerminated); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { updateProcessor(processor, proposedProcessor); + + final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream() + .map(relName -> processor.getRelationship(relName)) + .collect(Collectors.toSet()); + + if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) { + autoTerminatedRelationships.put(processor, proposedAutoTerminated); + } + LOG.info("Updated {}", processor); } else { processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); @@ -3205,6 +3232,13 @@ public final class StandardProcessGroup implements ProcessGroup { group.removeConnection(connection); } + // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships. + // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated, + // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a + // Connection for that relationship exists. This will throw an Exception. + autoTerminatedRelationships.forEach((proc, rels) -> proc.setAutoTerminatedRelationships(rels)); + + // Remove all controller services no longer in use for (final String removedVersionedId : controllerServicesRemoved) { final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", service, group); @@ -3276,7 +3310,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); addProcessGroup(group); - updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true); + updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true); return group; } @@ -3535,10 +3569,6 @@ public final class StandardProcessGroup implements ProcessGroup { processor.setYieldPeriod(proposed.getYieldDuration()); processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); - processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream() - .map(relName -> processor.getRelationship(relName)) - .collect(Collectors.toSet())); - if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet()); @@ -3547,6 +3577,7 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) { final Map<String, String> fullPropertyMap = new HashMap<>(); for (final PropertyDescriptor property : currentProperties.keySet()) { @@ -3646,7 +3677,7 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public String getName() { - return "Flow Under Version Control"; + return "Versioned Flow"; } }; @@ -3659,7 +3690,7 @@ public final class StandardProcessGroup implements ProcessGroup { .findAny() .isPresent(); - LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences); + LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences); return Optional.of(modified); } @@ -3669,27 +3700,24 @@ public final class StandardProcessGroup implements ProcessGroup { readLock.lock(); try { final VersionControlInformation versionControlInfo = getVersionControlInformation(); - if (versionControlInfo == null) { - throw new IllegalStateException("Cannot update the Version of the flow for " + this - + " because the Process Group is not currently under Version Control"); - } - - if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) { - throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with"); - } - - if (verifyNotDirty) { - final Optional<Boolean> modifiedOption = versionControlInfo.getModified(); - if (!modifiedOption.isPresent()) { - throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow " - + "has not yet been synchronized with the Flow Registry. The Process Group must be" - + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later"); + if (versionControlInfo != null) { + if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) { + throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with"); } - if (Boolean.TRUE.equals(modifiedOption.get())) { - throw new IllegalStateException("Cannot change the Version of the flow for " + this - + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be" - + " restored to its original form before changing the version"); + if (verifyNotDirty) { + final Optional<Boolean> modifiedOption = versionControlInfo.getModified(); + if (!modifiedOption.isPresent()) { + throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow " + + "has not yet been synchronized with the Flow Registry. The Process Group must be" + + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later"); + } + + if (Boolean.TRUE.equals(modifiedOption.get())) { + throw new IllegalStateException("Cannot change the Version of the flow for " + this + + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be" + + " restored to its original form before changing the version"); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java new file mode 100644 index 0000000..da5880c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.registry.bucket.Bucket; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A simple file-based implementation of a Flow Registry Client. Rather than interacting + * with an actual Flow Registry, this implementation simply reads flows from disk and writes + * them to disk. It is not meant for any production use but is available for testing purposes. + */ +public class FileBasedFlowRegistry implements FlowRegistry { + private final File directory; + private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>(); + private final JsonFactory jsonFactory = new JsonFactory(); + private final String id; + private volatile String name = "Local Registry"; + private volatile String url = "file:" + (new File("..").getAbsolutePath()); + private volatile String description = "Default file-based Flow Registry"; + + public FileBasedFlowRegistry(final String id, final String url) throws IOException { + final URI uri = URI.create(url); + if (!uri.getScheme().equalsIgnoreCase("file")) { + throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'"); + } + + this.directory = new File(URI.create(url).getPath()); + + if (!directory.exists() && !directory.mkdirs()) { + throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry"); + } + + this.id = id; + this.url = url; + recoverBuckets(); + } + + private void recoverBuckets() throws IOException { + final File[] bucketDirs = directory.listFiles(); + if (bucketDirs == null) { + throw new IOException("Could not get listing of directory " + directory); + } + + for (final File bucketDir : bucketDirs) { + final File[] flowDirs = bucketDir.listFiles(); + if (flowDirs == null) { + throw new IOException("Could not get listing of directory " + bucketDir); + } + + final Set<String> flowNames = new HashSet<>(); + for (final File flowDir : flowDirs) { + final File propsFile = new File(flowDir, "flow.properties"); + if (!propsFile.exists()) { + continue; + } + + final Properties properties = new Properties(); + try (final InputStream in = new FileInputStream(propsFile)) { + properties.load(in); + } + + final String flowName = properties.getProperty("name"); + if (flowName == null) { + continue; + } + + flowNames.add(flowName); + } + + if (!flowNames.isEmpty()) { + flowNamesByBucket.put(bucketDir.getName(), flowNames); + } + } + } + + @Override + public String getURL() { + return url; + } + + @Override + public String getName() { + return name; + } + + @Override + public Set<Bucket> getBuckets(NiFiUser user) throws IOException { + final Set<Bucket> buckets = new HashSet<>(); + + final File[] bucketDirs = directory.listFiles(); + if (bucketDirs == null) { + throw new IOException("Could not get listing of directory " + directory); + } + + for (final File bucketDirectory : bucketDirs) { + final String bucketIdentifier = bucketDirectory.getName(); + final long creation = bucketDirectory.lastModified(); + + final Bucket bucket = new Bucket(); + bucket.setIdentifier(bucketIdentifier); + bucket.setName("Bucket '" + bucketIdentifier + "'"); + bucket.setCreatedTimestamp(creation); + + final Set<VersionedFlow> versionedFlows = new HashSet<>(); + final File[] flowDirs = bucketDirectory.listFiles(); + if (flowDirs != null) { + for (final File flowDir : flowDirs) { + final String flowIdentifier = flowDir.getName(); + try { + final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier); + versionedFlows.add(versionedFlow); + } catch (UnknownResourceException e) { + continue; + } + } + } + + bucket.setVersionedFlows(versionedFlows); + + buckets.add(bucket); + } + + return buckets; + } + + + @Override + public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException { + Objects.requireNonNull(flow); + Objects.requireNonNull(flow.getBucketIdentifier()); + Objects.requireNonNull(flow.getName()); + + // Verify that bucket exists + final File bucketDir = new File(directory, flow.getBucketIdentifier()); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + } + + // Verify that there is no flow with the same name in that bucket + final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier()); + if (flowNames != null && flowNames.contains(flow.getName())) { + throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier()); + } + + final String flowIdentifier = UUID.randomUUID().toString(); + final File flowDir = new File(bucketDir, flowIdentifier); + if (!flowDir.mkdirs()) { + throw new IOException("Failed to create directory " + flowDir + " for new Flow"); + } + + final File propertiesFile = new File(flowDir, "flow.properties"); + + final Properties flowProperties = new Properties(); + flowProperties.setProperty("name", flow.getName()); + flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp())); + flowProperties.setProperty("description", flow.getDescription()); + flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp())); + + try (final OutputStream out = new FileOutputStream(propertiesFile)) { + flowProperties.store(out, null); + } + + final VersionedFlow response = new VersionedFlow(); + response.setBucketIdentifier(flow.getBucketIdentifier()); + response.setCreatedTimestamp(flow.getCreatedTimestamp()); + response.setDescription(flow.getDescription()); + response.setIdentifier(flowIdentifier); + response.setModifiedTimestamp(flow.getModifiedTimestamp()); + response.setName(flow.getName()); + + return response; + } + + @Override + public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments) + throws IOException, UnknownResourceException { + Objects.requireNonNull(flow); + Objects.requireNonNull(flow.getBucketIdentifier()); + Objects.requireNonNull(flow.getName()); + Objects.requireNonNull(snapshot); + + // Verify that the bucket exists + final File bucketDir = new File(directory, flow.getBucketIdentifier()); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flow.getIdentifier()); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); + } + + final File[] versionDirs = flowDir.listFiles(); + if (versionDirs == null) { + throw new IOException("Unable to perform listing of directory " + flowDir); + } + + int maxVersion = 0; + for (final File versionDir : versionDirs) { + final String versionName = versionDir.getName(); + + final int version; + try { + version = Integer.parseInt(versionName); + } catch (final NumberFormatException nfe) { + continue; + } + + if (version > maxVersion) { + maxVersion = version; + } + } + + final int snapshotVersion = maxVersion + 1; + final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion)); + if (!snapshotDir.mkdir()) { + throw new IOException("Could not create directory " + snapshotDir); + } + + final File contentsFile = new File(snapshotDir, "flow.xml"); + + try (final OutputStream out = new FileOutputStream(contentsFile); + final JsonGenerator generator = jsonFactory.createGenerator(out)) { + generator.setCodec(new ObjectMapper()); + generator.setPrettyPrinter(new DefaultPrettyPrinter()); + generator.writeObject(snapshot); + } + + final Properties snapshotProperties = new Properties(); + snapshotProperties.setProperty("comments", comments); + snapshotProperties.setProperty("name", flow.getName()); + final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties"); + try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) { + snapshotProperties.store(out, null); + } + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier()); + snapshotMetadata.setComments(comments); + snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); + snapshotMetadata.setFlowName(flow.getName()); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + snapshotMetadata.setVersion(snapshotVersion); + + final VersionedFlowSnapshot response = new VersionedFlowSnapshot(); + response.setSnapshotMetadata(snapshotMetadata); + response.setFlowContents(snapshot); + return response; + } + + @Override + public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); + } + + final File[] versionDirs = flowDir.listFiles(); + if (versionDirs == null) { + throw new IOException("Unable to perform listing of directory " + flowDir); + } + + int maxVersion = 0; + for (final File versionDir : versionDirs) { + final String versionName = versionDir.getName(); + + final int version; + try { + version = Integer.parseInt(versionName); + } catch (final NumberFormatException nfe) { + continue; + } + + if (version > maxVersion) { + maxVersion = version; + } + } + + return maxVersion; + } + + @Override + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + } + + final File versionDir = new File(flowDir, String.valueOf(version)); + if (!versionDir.exists()) { + throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); + } + + final File contentsFile = new File(versionDir, "flow.xml"); + + final VersionedProcessGroup processGroup; + try (final JsonParser parser = jsonFactory.createParser(contentsFile)) { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + parser.setCodec(mapper); + processGroup = parser.readValueAs(VersionedProcessGroup.class); + } + + final Properties properties = new Properties(); + final File snapshotPropsFile = new File(versionDir, "snapshot.properties"); + try (final InputStream in = new FileInputStream(snapshotPropsFile)) { + properties.load(in); + } + + final String comments = properties.getProperty("comments"); + final String flowName = properties.getProperty("name"); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(bucketId); + snapshotMetadata.setComments(comments); + snapshotMetadata.setFlowIdentifier(flowId); + snapshotMetadata.setFlowName(flowName); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + snapshotMetadata.setVersion(version); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setFlowContents(processGroup); + snapshot.setSnapshotMetadata(snapshotMetadata); + + return snapshot; + } + + @Override + public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + } + + final File flowPropsFile = new File(flowDir, "flow.properties"); + final Properties flowProperties = new Properties(); + try (final InputStream in = new FileInputStream(flowPropsFile)) { + flowProperties.load(in); + } + + final VersionedFlow flow = new VersionedFlow(); + flow.setBucketIdentifier(bucketId); + flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created"))); + flow.setDescription(flowProperties.getProperty("description")); + flow.setIdentifier(flowId); + flow.setModifiedTimestamp(flowDir.lastModified()); + flow.setName(flowProperties.getProperty("name")); + + final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion()); + + final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator); + flow.setSnapshotMetadata(snapshotMetadataSet); + + final File[] versionDirs = flowDir.listFiles(); + flow.setVersionCount(versionDirs.length); + + for (final File file : versionDirs) { + if (!file.isDirectory()) { + continue; + } + + int version; + try { + version = Integer.parseInt(file.getName()); + } catch (final NumberFormatException nfe) { + // not a version. skip. + continue; + } + + final File snapshotPropsFile = new File(file, "snapshot.properties"); + final Properties snapshotProperties = new Properties(); + try (final InputStream in = new FileInputStream(snapshotPropsFile)) { + snapshotProperties.load(in); + } + + final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setBucketIdentifier(bucketId); + metadata.setComments(snapshotProperties.getProperty("comments")); + metadata.setFlowIdentifier(flowId); + metadata.setFlowName(snapshotProperties.getProperty("name")); + metadata.setTimestamp(file.lastModified()); + metadata.setVersion(version); + + snapshotMetadataSet.add(metadata); + } + + return flow; + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public void setDescription(String description) { + this.description = description; + } + + @Override + public void setURL(String url) { + this.url = url; + } + + @Override + public void setName(String name) { + this.name = name; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java deleted file mode 100644 index 2cc39c6..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.registry.flow; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.registry.bucket.Bucket; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; - -/** - * A simple file-based implementation of a Flow Registry Client. Rather than interacting - * with an actual Flow Registry, this implementation simply reads flows from disk and writes - * them to disk. It is not meant for any production use but is available for testing purposes. - */ -public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry { - private final File directory; - private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>(); - private final JsonFactory jsonFactory = new JsonFactory(); - - public FileBasedFlowRegistryClient(final File directory) throws IOException { - if (!directory.exists() && !directory.mkdirs()) { - throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry"); - } - - this.directory = directory; - recoverBuckets(); - } - - private void recoverBuckets() throws IOException { - final File[] bucketDirs = directory.listFiles(); - if (bucketDirs == null) { - throw new IOException("Could not get listing of directory " + directory); - } - - for (final File bucketDir : bucketDirs) { - final File[] flowDirs = bucketDir.listFiles(); - if (flowDirs == null) { - throw new IOException("Could not get listing of directory " + bucketDir); - } - - final Set<String> flowNames = new HashSet<>(); - for (final File flowDir : flowDirs) { - final File propsFile = new File(flowDir, "flow.properties"); - if (!propsFile.exists()) { - continue; - } - - final Properties properties = new Properties(); - try (final InputStream in = new FileInputStream(propsFile)) { - properties.load(in); - } - - final String flowName = properties.getProperty("name"); - if (flowName == null) { - continue; - } - - flowNames.add(flowName); - } - - if (!flowNames.isEmpty()) { - flowNamesByBucket.put(bucketDir.getName(), flowNames); - } - } - } - - @Override - public FlowRegistry getFlowRegistry(final String registryId) { - if (!"default".equals(registryId)) { - return null; - } - - return this; - } - - @Override - public String getURL() { - return directory.toURI().toString(); - } - - @Override - public String getName() { - return "Local Registry"; - } - - @Override - public Set<Bucket> getBuckets(NiFiUser user) throws IOException { - final Set<Bucket> buckets = new HashSet<>(); - - final File[] bucketDirs = directory.listFiles(); - if (bucketDirs == null) { - throw new IOException("Could not get listing of directory " + directory); - } - - for (final File bucketDirectory : bucketDirs) { - final String bucketIdentifier = bucketDirectory.getName(); - final long creation = bucketDirectory.lastModified(); - - final Bucket bucket = new Bucket(); - bucket.setIdentifier(bucketIdentifier); - bucket.setName("Bucket '" + bucketIdentifier + "'"); - bucket.setCreatedTimestamp(creation); - - buckets.add(bucket); - } - - return buckets; - } - - @Override - public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException { - Objects.requireNonNull(flow); - Objects.requireNonNull(flow.getBucketIdentifier()); - Objects.requireNonNull(flow.getName()); - - // Verify that bucket exists - final File bucketDir = new File(directory, flow.getBucketIdentifier()); - if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); - } - - // Verify that there is no flow with the same name in that bucket - final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier()); - if (flowNames != null && flowNames.contains(flow.getName())) { - throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier()); - } - - final String flowIdentifier = UUID.randomUUID().toString(); - final File flowDir = new File(bucketDir, flowIdentifier); - if (!flowDir.mkdirs()) { - throw new IOException("Failed to create directory " + flowDir + " for new Flow"); - } - - final File propertiesFile = new File(flowDir, "flow.properties"); - - final Properties flowProperties = new Properties(); - flowProperties.setProperty("name", flow.getName()); - flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp())); - flowProperties.setProperty("description", flow.getDescription()); - flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp())); - - try (final OutputStream out = new FileOutputStream(propertiesFile)) { - flowProperties.store(out, null); - } - - final VersionedFlow response = new VersionedFlow(); - response.setBucketIdentifier(flow.getBucketIdentifier()); - response.setCreatedTimestamp(flow.getCreatedTimestamp()); - response.setDescription(flow.getDescription()); - response.setIdentifier(flowIdentifier); - response.setModifiedTimestamp(flow.getModifiedTimestamp()); - response.setName(flow.getName()); - - return response; - } - - @Override - public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments) - throws IOException, UnknownResourceException { - Objects.requireNonNull(flow); - Objects.requireNonNull(flow.getBucketIdentifier()); - Objects.requireNonNull(flow.getName()); - Objects.requireNonNull(snapshot); - - // Verify that the bucket exists - final File bucketDir = new File(directory, flow.getBucketIdentifier()); - if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flow.getIdentifier()); - if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); - } - - final File[] versionDirs = flowDir.listFiles(); - if (versionDirs == null) { - throw new IOException("Unable to perform listing of directory " + flowDir); - } - - int maxVersion = 0; - for (final File versionDir : versionDirs) { - final String versionName = versionDir.getName(); - - final int version; - try { - version = Integer.parseInt(versionName); - } catch (final NumberFormatException nfe) { - continue; - } - - if (version > maxVersion) { - maxVersion = version; - } - } - - final int snapshotVersion = maxVersion + 1; - final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion)); - if (!snapshotDir.mkdir()) { - throw new IOException("Could not create directory " + snapshotDir); - } - - final File contentsFile = new File(snapshotDir, "flow.xml"); - - try (final OutputStream out = new FileOutputStream(contentsFile); - final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) { - generator.setCodec(new ObjectMapper()); - generator.setPrettyPrinter(new DefaultPrettyPrinter()); - generator.writeObject(snapshot); - } - - final Properties snapshotProperties = new Properties(); - snapshotProperties.setProperty("comments", comments); - snapshotProperties.setProperty("name", flow.getName()); - final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties"); - try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) { - snapshotProperties.store(out, null); - } - - final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); - snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier()); - snapshotMetadata.setComments(comments); - snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); - snapshotMetadata.setFlowName(flow.getName()); - snapshotMetadata.setTimestamp(System.currentTimeMillis()); - snapshotMetadata.setVersion(snapshotVersion); - - final VersionedFlowSnapshot response = new VersionedFlowSnapshot(); - response.setSnapshotMetadata(snapshotMetadata); - response.setFlowContents(snapshot); - return response; - } - - @Override - public Set<String> getRegistryIdentifiers() { - return Collections.singleton("default"); - } - - @Override - public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); - } - - final File[] versionDirs = flowDir.listFiles(); - if (versionDirs == null) { - throw new IOException("Unable to perform listing of directory " + flowDir); - } - - int maxVersion = 0; - for (final File versionDir : versionDirs) { - final String versionName = versionDir.getName(); - - final int version; - try { - version = Integer.parseInt(versionName); - } catch (final NumberFormatException nfe) { - continue; - } - - if (version > maxVersion) { - maxVersion = version; - } - } - - return maxVersion; - } - - @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); - } - - final File versionDir = new File(flowDir, String.valueOf(version)); - if (!versionDir.exists()) { - throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); - } - - final File contentsFile = new File(versionDir, "flow.xml"); - - final VersionedProcessGroup processGroup; - try (final JsonParser parser = jsonFactory.createParser(contentsFile)) { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - parser.setCodec(mapper); - processGroup = parser.readValueAs(VersionedProcessGroup.class); - } - - final Properties properties = new Properties(); - final File snapshotPropsFile = new File(versionDir, "snapshot.properties"); - try (final InputStream in = new FileInputStream(snapshotPropsFile)) { - properties.load(in); - } - - final String comments = properties.getProperty("comments"); - final String flowName = properties.getProperty("name"); - - final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); - snapshotMetadata.setBucketIdentifier(bucketId); - snapshotMetadata.setComments(comments); - snapshotMetadata.setFlowIdentifier(flowId); - snapshotMetadata.setFlowName(flowName); - snapshotMetadata.setTimestamp(System.currentTimeMillis()); - snapshotMetadata.setVersion(version); - - final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); - snapshot.setFlowContents(processGroup); - snapshot.setSnapshotMetadata(snapshotMetadata); - - return snapshot; - } - - @Override - public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException { - // Verify that the bucket exists - final File bucketDir = new File(directory, bucketId); - if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); - } - - // Verify that the flow exists - final File flowDir = new File(bucketDir, flowId); - if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); - } - - final File flowPropsFile = new File(flowDir, "flow.properties"); - final Properties flowProperties = new Properties(); - try (final InputStream in = new FileInputStream(flowPropsFile)) { - flowProperties.load(in); - } - - final VersionedFlow flow = new VersionedFlow(); - flow.setBucketIdentifier(bucketId); - flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created"))); - flow.setDescription(flowProperties.getProperty("description")); - flow.setIdentifier(flowId); - flow.setModifiedTimestamp(flowDir.lastModified()); - flow.setName(flowProperties.getProperty("name")); - - final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion()); - - final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator); - flow.setSnapshotMetadata(snapshotMetadataSet); - - final File[] versionDirs = flowDir.listFiles(); - for (final File file : versionDirs) { - if (!file.isDirectory()) { - continue; - } - - int version; - try { - version = Integer.parseInt(file.getName()); - } catch (final NumberFormatException nfe) { - // not a version. skip. - continue; - } - - final File snapshotPropsFile = new File(file, "snapshot.properties"); - final Properties snapshotProperties = new Properties(); - try (final InputStream in = new FileInputStream(snapshotPropsFile)) { - snapshotProperties.load(in); - } - - final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); - metadata.setBucketIdentifier(bucketId); - metadata.setComments(snapshotProperties.getProperty("comments")); - metadata.setFlowIdentifier(flowId); - metadata.setFlowName(snapshotProperties.getProperty("name")); - metadata.setTimestamp(file.lastModified()); - metadata.setVersion(version); - - snapshotMetadataSet.add(metadata); - } - - return flow; - } -}
