http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index ec32cc1..282c50d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -16,7 +16,30 @@
  */
 package org.apache.nifi.groups;
 
-import com.google.common.collect.Sets;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -29,6 +52,7 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -40,6 +64,7 @@ import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Positionable;
+import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
@@ -49,22 +74,58 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedControllerService;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFunnel;
+import org.apache.nifi.registry.flow.VersionedLabel;
+import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.Revision;
@@ -72,23 +133,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public final class StandardProcessGroup implements ProcessGroup {
 
     private final String id;
@@ -96,6 +140,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     private final AtomicReference<String> name;
     private final AtomicReference<Position> position;
     private final AtomicReference<String> comments;
+    private final AtomicReference<String> versionedComponentId = new 
AtomicReference<>();
+    private final AtomicReference<StandardVersionControlInformation> 
versionControlInfo = new AtomicReference<>();
 
     private final StandardProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
@@ -782,7 +828,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             removed = true;
             LOG.info("{} removed from flow", processor);
-
         } finally {
             if (removed) {
                 try {
@@ -1935,7 +1980,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             removed = true;
             LOG.info("{} removed from {}", service, this);
-
         } finally {
             if (removed) {
                 try {
@@ -2234,7 +2278,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     @Override
     public Set<Positionable> findAllPositionables() {
-        Set<Positionable> positionables = Sets.newHashSet();
+        final Set<Positionable> positionables = new HashSet<>();
         positionables.addAll(findAllConnectables(this, true));
         List<ProcessGroup> allProcessGroups = findAllProcessGroups();
         positionables.addAll(allProcessGroups);
@@ -2371,7 +2415,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 connection.verifyCanDelete();
             }
 
-            for(final ControllerServiceNode cs : controllerServices.values()) {
+            for (final ControllerServiceNode cs : controllerServices.values()) 
{
                 cs.verifyCanDelete();
             }
 
@@ -2647,6 +2691,31 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String componentId) {
+        writeLock.lock();
+        try {
+            final String currentId = versionedComponentId.get();
+
+            if (currentId == null) {
+                versionedComponentId.set(componentId);
+            } else if (currentId.equals(componentId)) {
+                return;
+            } else if (componentId == null) {
+                versionedComponentId.set(null);
+            } else {
+                throw new IllegalStateException(this + " is already under 
version control with a different Versioned Component ID");
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
     public Set<ConfiguredComponent> getComponentsAffectedByVariable(final 
String variableName) {
         final Set<ConfiguredComponent> affected = new HashSet<>();
 
@@ -2736,4 +2805,1072 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    @Override
+    public VersionControlInformation getVersionControlInformation() {
+        return versionControlInfo.get();
+    }
+
+    @Override
+    public void setVersionControlInformation(final VersionControlInformation 
versionControlInformation, final Map<String, String> versionedComponentIds) {
+        final StandardVersionControlInformation svci = new 
StandardVersionControlInformation(versionControlInformation.getRegistryIdentifier(),
+            versionControlInformation.getBucketIdentifier(),
+            versionControlInformation.getFlowIdentifier(),
+            versionControlInformation.getVersion(),
+            versionControlInformation.getFlowSnapshot(),
+            versionControlInformation.getModified().orElse(null),
+            versionControlInformation.getCurrent().orElse(null)) {
+
+            @Override
+            public Optional<Boolean> getModified() {
+                return StandardProcessGroup.this.isModified();
+            }
+        };
+
+        writeLock.lock();
+        try {
+            updateVersionedComponentIds(this, versionedComponentIds);
+            this.versionControlInfo.set(svci);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void updateVersionedComponentIds(final ProcessGroup processGroup, 
final Map<String, String> versionedComponentIds) {
+        if (versionedComponentIds == null || versionedComponentIds.isEmpty()) {
+            return;
+        }
+
+        
processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
+
+        processGroup.getConnections().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getProcessors().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getInputPorts().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getOutputPorts().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getLabels().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getFunnels().stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getControllerServices(false).stream()
+            .forEach(component -> 
component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+
+        processGroup.getRemoteProcessGroups().stream()
+            .forEach(rpg -> {
+                
rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
+
+                rpg.getInputPorts().stream()
+                    .forEach(port -> 
port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+
+                rpg.getOutputPorts().stream()
+                    .forEach(port -> 
port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+            });
+
+        processGroup.getProcessGroups().stream()
+            .forEach(childGroup -> updateVersionedComponentIds(childGroup, 
versionedComponentIds));
+    }
+
+
+    @Override
+    public void synchronizeWithFlowRegistry(final FlowRegistryClient 
flowRegistryClient) {
+        final StandardVersionControlInformation vci = versionControlInfo.get();
+        if (vci == null) {
+            return;
+        }
+
+        final String registryId = vci.getRegistryIdentifier();
+        final FlowRegistry flowRegistry = 
flowRegistryClient.getFlowRegistry(registryId);
+        if (flowRegistry == null) {
+            LOG.error("Unable to synchronize {} with Flow Registry because 
Process Group was placed under Version Control using Flow Registry "
+                + "with identifier {} but cannot find any Flow Registry with 
this identifier", this, registryId);
+            return;
+        }
+
+        try {
+            final int latestVersion = 
flowRegistry.getLatestVersion(vci.getBucketIdentifier(), 
vci.getFlowIdentifier());
+
+            if (latestVersion == vci.getVersion()) {
+                LOG.debug("{} is currently at the most recent version ({}) of 
the flow that is under Version Control", this, latestVersion);
+                vci.setCurrent(true);
+            } else {
+                vci.setCurrent(false);
+                LOG.info("{} is not the most recent version of the flow that 
is under Version Control; current version is {}; most recent version is {}",
+                    new Object[] {this, vci.getVersion(), latestVersion});
+            }
+        } catch (final IOException | UnknownResourceException e) {
+            LOG.error("Failed to synchronize {} with Flow Registry because 
could not determine the most recent version of the Flow in the Flow Registry", 
this, e);
+        }
+
+        final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
+        if (snapshot == null) {
+            // We have not yet obtained the snapshot from the Flow Registry, 
so we need to request the snapshot of our local version of the flow from the 
Flow Registry.
+            // This allows us to know whether or not the flow has been 
modified since it was last synced with the Flow Registry.
+            try {
+                final VersionedFlowSnapshot registrySnapshot = 
flowRegistry.getFlowContents(vci.getBucketIdentifier(), 
vci.getFlowIdentifier(), vci.getVersion());
+                final VersionedProcessGroup registryFlow = 
registrySnapshot.getFlowContents();
+                vci.setFlowSnapshot(registryFlow);
+            } catch (final IOException | UnknownResourceException e) {
+                LOG.error("Failed to synchronize {} with Flow Registry because 
could not retrieve version {} of flow with identifier {} in bucket {}",
+                    new Object[] {this, vci.getVersion(), 
vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e);
+                return;
+            }
+        }
+    }
+
+
+    @Override
+    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final 
String componentIdSeed, final boolean verifyNotDirty) {
+        writeLock.lock();
+        try {
+            verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: 
Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. 
Recursively if child is under VC also
+
+            final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+            final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
+
+            final ComparableDataFlow localFlow = new 
StandardComparableDataFlow("Local Flow", versionedGroup);
+            final ComparableDataFlow remoteFlow = new 
StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
+
+            final FlowComparator flowComparator = new 
StandardFlowComparator(localFlow, remoteFlow);
+            final FlowComparison flowComparison = flowComparator.compare();
+
+            final Set<String> updatedVersionedComponentIds = 
flowComparison.getDifferences().stream()
+                .filter(diff -> diff.getDifferenceType() != 
DifferenceType.POSITION_CHANGED)
+                .map(diff -> diff.getComponentA() == null ? 
diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
+                .collect(Collectors.toSet());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating {} to {}; there are {} differences to take 
into account: {}", this, proposedSnapshot, 
flowComparison.getDifferences().size(), flowComparison.getDifferences());
+            } else {
+                // TODO: Remove the actual differences from the info level 
log. It can be extremely verbose. Is here only for testing purposes becuase 
it's much more convenient
+                // than having to remember to enable DEBUG level logging every 
time a full build is done.
+                LOG.info("Updating {} to {}; there are {} differences to take 
into account: {}", this, proposedSnapshot, 
flowComparison.getDifferences().size(), flowComparison.getDifferences());
+            }
+
+            updateProcessGroup(this, proposedSnapshot.getFlowContents(), 
componentIdSeed, updatedVersionedComponentIds, false);
+        } catch (final ProcessorInstantiationException pie) {
+            throw new RuntimeException(pie);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+
+    private void updateProcessGroup(final ProcessGroup group, final 
VersionedProcessGroup proposed, final String componentIdSeed,
+        final Set<String> updatedVersionedComponentIds, final boolean 
updatePosition) throws ProcessorInstantiationException {
+
+        group.setComments(proposed.getComments());
+        group.setName(proposed.getName());
+        if (updatePosition && proposed.getPosition() != null) {
+            group.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+        }
+
+        // Determine which variables have been added/removed and add/remove 
them from this group's variable registry.
+        // We don't worry about if a variable value has changed, because 
variables are designed to be 'environment specific.'
+        // As a result, once imported, we won't update variables to match the 
remote flow, but we will add any missing variables
+        // and remove any variables that are no longer part of the remote flow.
+        final Set<String> existingVariableNames = 
group.getVariableRegistry().getVariableMap().keySet().stream()
+            .map(VariableDescriptor::getName)
+            .collect(Collectors.toSet());
+
+        final Set<String> variablesRemoved = new 
HashSet<>(existingVariableNames);
+        variablesRemoved.removeAll(proposed.getVariables().keySet());
+        final Map<String, String> updatedVariableMap = new HashMap<>();
+        variablesRemoved.forEach(var -> updatedVariableMap.put(var, null));
+
+        // If any new variables exist in the proposed flow, add those to the 
variable registry.
+        for (final Map.Entry<String, String> entry : 
proposed.getVariables().entrySet()) {
+            if (!existingVariableNames.contains(entry.getKey())) {
+                updatedVariableMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        group.setVariables(updatedVariableMap);
+
+        final RemoteFlowCoordinates remoteCoordinates = 
proposed.getRemoteFlowCoordinates();
+        if (remoteCoordinates != null) {
+            final String registryId = 
flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
+            final String bucketId = remoteCoordinates.getBucketId();
+            final String flowId = remoteCoordinates.getFlowId();
+            final int version = remoteCoordinates.getVersion();
+
+            final VersionControlInformation vci = new 
StandardVersionControlInformation(registryId, bucketId, flowId, version, 
proposed, false, true);
+            group.setVersionControlInformation(vci, Collections.emptyMap());
+        }
+
+        // Child groups
+        // TODO: Need to take into account if child group is under version 
control pointing to a different Versioned Flow and if so need to handle it 
differently.
+        final Map<String, ProcessGroup> childGroupsByVersionedId = 
group.getProcessGroups().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> childGroupsRemoved = new 
HashSet<>(childGroupsByVersionedId.keySet());
+
+        for (final VersionedProcessGroup proposedChildGroup : 
proposed.getProcessGroups()) {
+            final ProcessGroup childGroup = 
childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
+
+            if (childGroup == null) {
+                final ProcessGroup added = addProcessGroup(proposedChildGroup, 
componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else {
+                updateProcessGroup(childGroup, proposedChildGroup, 
componentIdSeed, updatedVersionedComponentIds, true);
+                LOG.info("Updated {}", childGroup);
+            }
+
+            childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
+        }
+
+
+        // Controller Services
+        final Map<String, ControllerServiceNode> servicesByVersionedId = 
group.getControllerServices(false).stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> controllerServicesRemoved = new 
HashSet<>(servicesByVersionedId.keySet());
+
+        for (final VersionedControllerService proposedService : 
proposed.getControllerServices()) {
+            final ControllerServiceNode service = 
servicesByVersionedId.get(proposedService.getIdentifier());
+            if (service == null) {
+                final ControllerServiceNode added = 
addControllerService(proposedService, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+                updateControllerService(service, proposedService);
+                LOG.info("Updated {}", service);
+            }
+
+            controllerServicesRemoved.remove(proposedService.getIdentifier());
+        }
+
+
+        // Funnels
+        final Map<String, Funnel> funnelsByVersionedId = 
group.getFunnels().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> funnelsRemoved = new 
HashSet<>(funnelsByVersionedId.keySet());
+
+        for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
+            final Funnel funnel = 
funnelsByVersionedId.get(proposedFunnel.getIdentifier());
+            if (funnel == null) {
+                final Funnel added = addFunnel(proposedFunnel, 
componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
+                updateFunnel(funnel, proposedFunnel);
+                LOG.info("Updated {}", funnel);
+            } else {
+                funnel.setPosition(new 
Position(proposedFunnel.getPosition().getX(), 
proposedFunnel.getPosition().getY()));
+            }
+
+            funnelsRemoved.remove(proposedFunnel.getIdentifier());
+        }
+
+
+        // Input Ports
+        final Map<String, Port> inputPortsByVersionedId = 
group.getInputPorts().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> inputPortsRemoved = new 
HashSet<>(inputPortsByVersionedId.keySet());
+
+        for (final VersionedPort proposedPort : proposed.getInputPorts()) {
+            final Port port = 
inputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final Port added = addInputPort(proposedPort, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                updatePort(port, proposedPort);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new 
Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+
+            inputPortsRemoved.remove(proposedPort.getIdentifier());
+        }
+
+        // Output Ports
+        final Map<String, Port> outputPortsByVersionedId = 
group.getOutputPorts().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> outputPortsRemoved = new 
HashSet<>(outputPortsByVersionedId.keySet());
+
+        for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
+            final Port port = 
outputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final Port added = addOutputPort(proposedPort, 
componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                updatePort(port, proposedPort);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new 
Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+
+            outputPortsRemoved.remove(proposedPort.getIdentifier());
+        }
+
+
+        // Labels
+        final Map<String, Label> labelsByVersionedId = 
group.getLabels().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> labelsRemoved = new 
HashSet<>(labelsByVersionedId.keySet());
+
+        for (final VersionedLabel proposedLabel : proposed.getLabels()) {
+            final Label label = 
labelsByVersionedId.get(proposedLabel.getIdentifier());
+            if (label == null) {
+                final Label added = addLabel(proposedLabel, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) {
+                updateLabel(label, proposedLabel);
+                LOG.info("Updated {}", label);
+            } else {
+                label.setPosition(new 
Position(proposedLabel.getPosition().getX(), 
proposedLabel.getPosition().getY()));
+            }
+
+            labelsRemoved.remove(proposedLabel.getIdentifier());
+        }
+
+
+        // Processors
+        final Map<String, ProcessorNode> processorsByVersionedId = 
group.getProcessors().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> processorsRemoved = new 
HashSet<>(processorsByVersionedId.keySet());
+
+        for (final VersionedProcessor proposedProcessor : 
proposed.getProcessors()) {
+            final ProcessorNode processor = 
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+            if (processor == null) {
+                final ProcessorNode added = addProcessor(proposedProcessor, 
componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+                updateProcessor(processor, proposedProcessor);
+                LOG.info("Updated {}", processor);
+            } else {
+                processor.setPosition(new 
Position(proposedProcessor.getPosition().getX(), 
proposedProcessor.getPosition().getY()));
+            }
+
+            processorsRemoved.remove(proposedProcessor.getIdentifier());
+        }
+
+
+        // Remote Groups
+        final Map<String, RemoteProcessGroup> rpgsByVersionedId = 
group.getRemoteProcessGroups().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> rpgsRemoved = new 
HashSet<>(rpgsByVersionedId.keySet());
+
+        for (final VersionedRemoteProcessGroup proposedRpg : 
proposed.getRemoteProcessGroups()) {
+            final RemoteProcessGroup rpg = 
rpgsByVersionedId.get(proposedRpg.getIdentifier());
+            if (rpg == null) {
+                final RemoteProcessGroup added = 
addRemoteProcessGroup(proposedRpg, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if 
(updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
+                updateRemoteProcessGroup(rpg, proposedRpg);
+                LOG.info("Updated {}", rpg);
+            } else {
+                rpg.setPosition(new Position(proposedRpg.getPosition().getX(), 
proposedRpg.getPosition().getY()));
+            }
+
+            rpgsRemoved.remove(proposedRpg.getIdentifier());
+        }
+
+
+        // Connections
+        final Map<String, Connection> connectionsByVersionedId = 
group.getConnections().stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+        final Set<String> connectionsRemoved = new 
HashSet<>(connectionsByVersionedId.keySet());
+
+        for (final VersionedConnection proposedConnection : 
proposed.getConnections()) {
+            final Connection connection = 
connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                final Connection added = addConnection(proposedConnection, 
componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (!connection.getSource().isRunning() && 
!connection.getDestination().isRunning()) {
+                // If the connection needs to be updated, then the source and 
destination will already have
+                // been stopped (else, the validation above would fail). So if 
the source or the destination is running,
+                // then we know that we don't need to update the connection.
+                updateConnection(connection, proposedConnection);
+                LOG.info("Updated {}", connection);
+            }
+
+            connectionsRemoved.remove(proposedConnection.getIdentifier());
+        }
+
+        // Remove components that exist in the local flow but not the remote 
flow.
+
+        // Connections must be the first thing to remove, not the last. 
Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        for (final String removedVersionedId : connectionsRemoved) {
+            final Connection connection = 
connectionsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", connection, group);
+            group.removeConnection(connection);
+        }
+
+        for (final String removedVersionedId : controllerServicesRemoved) {
+            final ControllerServiceNode service = 
servicesByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", service, group);
+            group.removeControllerService(service);
+        }
+
+        for (final String removedVersionedId : funnelsRemoved) {
+            final Funnel funnel = funnelsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", funnel, group);
+            group.removeFunnel(funnel);
+        }
+
+        for (final String removedVersionedId : inputPortsRemoved) {
+            final Port port = inputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeInputPort(port);
+        }
+
+        for (final String removedVersionedId : outputPortsRemoved) {
+            final Port port = outputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeOutputPort(port);
+        }
+
+        for (final String removedVersionedId : labelsRemoved) {
+            final Label label = labelsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", label, group);
+            group.removeLabel(label);
+        }
+
+        for (final String removedVersionedId : processorsRemoved) {
+            final ProcessorNode processor = 
processorsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", processor, group);
+            group.removeProcessor(processor);
+        }
+
+        for (final String removedVersionedId : rpgsRemoved) {
+            final RemoteProcessGroup rpg = 
rpgsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", rpg, group);
+            group.removeRemoteProcessGroup(rpg);
+        }
+
+        for (final String removedVersionedId : childGroupsRemoved) {
+            final ProcessGroup childGroup = 
childGroupsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", childGroup, group);
+            group.removeProcessGroup(childGroup);
+        }
+    }
+
+    protected String generateUuid(final String componentIdSeed) {
+        UUID uuid;
+        if (componentIdSeed == null) {
+            uuid = ComponentIdGenerator.generateId();
+        } else {
+            try {
+                UUID seedId = UUID.fromString(componentIdSeed);
+                uuid = new UUID(seedId.getMostSignificantBits(), 
componentIdSeed.hashCode());
+            } catch (Exception e) {
+                LOG.warn("Provided 'seed' does not represent UUID. Will not be 
able to extract most significant bits for ID generation.");
+                uuid = 
UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8));
+            }
+        }
+
+        return uuid.toString();
+    }
+
+
+    private ProcessGroup addProcessGroup(final VersionedProcessGroup proposed, 
final String componentIdSeed) throws ProcessorInstantiationException {
+        final ProcessGroup group = 
flowController.createProcessGroup(generateUuid(componentIdSeed));
+        group.setVersionedComponentId(proposed.getIdentifier());
+        addProcessGroup(group);
+        updateProcessGroup(group, proposed, componentIdSeed, 
Collections.emptySet(), true);
+        return group;
+    }
+
+    private void updateConnection(final Connection connection, final 
VersionedConnection proposed) {
+        connection.setBendPoints(proposed.getBends().stream()
+            .map(pos -> new Position(pos.getX(), pos.getY()))
+            .collect(Collectors.toList()));
+
+        connection.setDestination(getConnectable(proposed.getDestination()));
+        connection.setLabelIndex(proposed.getLabelIndex());
+        connection.setName(proposed.getName());
+        
connection.setRelationships(proposed.getSelectedRelationships().stream()
+            .map(name -> new Relationship.Builder().name(name).build())
+            .collect(Collectors.toSet()));
+        connection.setZIndex(proposed.getzIndex());
+
+        final FlowFileQueue queue = connection.getFlowFileQueue();
+        
queue.setBackPressureDataSizeThreshold(proposed.getBackPressureDataSizeThreshold());
+        
queue.setBackPressureObjectThreshold(proposed.getBackPressureObjectThreshold());
+        queue.setFlowFileExpiration(proposed.getFlowFileExpiration());
+
+        final List<FlowFilePrioritizer> prioritizers = 
proposed.getPrioritizers().stream()
+            .map(prioritizerName -> {
+                try {
+                    return flowController.createPrioritizer(prioritizerName);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Failed to create Prioritizer 
of type " + prioritizerName + " for Connection with ID " + 
connection.getIdentifier());
+                }
+            })
+            .collect(Collectors.toList());
+
+        queue.setPriorities(prioritizers);
+    }
+
+    private Connection addConnection(final VersionedConnection proposed, final 
String componentIdSeed) {
+        final Connectable source = getConnectable(proposed.getSource());
+        if (source == null) {
+            throw new IllegalArgumentException("Connection has a source with 
identifier " + proposed.getIdentifier()
+                + " but no component could be found in the Process Group with 
a corresponding identifier");
+        }
+
+        final Connectable destination = 
getConnectable(proposed.getDestination());
+        if (destination == null) {
+            throw new IllegalArgumentException("Connection has a destination 
with identifier " + proposed.getIdentifier()
+                + " but no component could be found in the Process Group with 
a corresponding identifier");
+        }
+
+        final Connection connection = 
flowController.createConnection(generateUuid(componentIdSeed), 
proposed.getName(), source, destination, proposed.getSelectedRelationships());
+        connection.setVersionedComponentId(proposed.getIdentifier());
+        addConnection(connection);
+        updateConnection(connection, proposed);
+
+        return connection;
+    }
+
+    private Connectable getConnectable(final ConnectableComponent 
connectableComponent) {
+        final String id = connectableComponent.getId();
+
+        switch (connectableComponent.getType()) {
+            case FUNNEL:
+                return getFunnels().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case INPUT_PORT:
+                return getInputPorts().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case OUTPUT_PORT:
+                return getOutputPorts().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case PROCESSOR:
+                return getProcessors().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case REMOTE_INPUT_PORT: {
+                final String rpgId = connectableComponent.getGroupId();
+                final Optional<RemoteProcessGroup> rpgOption = 
getRemoteProcessGroups().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (!rpgOption.isPresent()) {
+                    throw new IllegalArgumentException("Connection refers to a 
Port with ID " + id + " within Remote Process Group with ID "
+                        + rpgId + " but could not find a Remote Process Group 
corresponding to that ID");
+                }
+
+                final RemoteProcessGroup rpg = rpgOption.get();
+                return rpg.getInputPorts().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            }
+            case REMOTE_OUTPUT_PORT: {
+                final String rpgId = connectableComponent.getGroupId();
+                final Optional<RemoteProcessGroup> rpgOption = 
getRemoteProcessGroups().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (!rpgOption.isPresent()) {
+                    throw new IllegalArgumentException("Connection refers to a 
Port with ID " + id + " within Remote Process Group with ID "
+                        + rpgId + " but could not find a Remote Process Group 
corresponding to that ID");
+                }
+
+                final RemoteProcessGroup rpg = rpgOption.get();
+                return rpg.getOutputPorts().stream()
+                    .filter(component -> 
component.getVersionedComponentId().isPresent())
+                    .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            }
+        }
+
+        return null;
+    }
+
+    private void updateControllerService(final ControllerServiceNode service, 
final VersionedControllerService proposed) {
+        service.setAnnotationData(proposed.getAnnotationData());
+        service.setComments(proposed.getComments());
+        service.setName(proposed.getName());
+        service.setProperties(populatePropertiesMap(service.getProperties(), 
proposed.getProperties()));
+
+        if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
+            final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
+            final List<PropertyDescriptor> descriptors = new 
ArrayList<>(service.getProperties().keySet());
+            final Set<URL> additionalUrls = 
service.getAdditionalClasspathResources(descriptors);
+            flowController.reload(service, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+        }
+    }
+
+    private boolean isEqual(final BundleCoordinate coordinate, final Bundle 
bundle) {
+        if (!bundle.getGroup().equals(coordinate.getGroup())) {
+            return false;
+        }
+
+        if (!bundle.getArtifact().equals(coordinate.getId())) {
+            return false;
+        }
+
+        if (!bundle.getVersion().equals(coordinate.getVersion())) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private BundleCoordinate toCoordinate(final Bundle bundle) {
+        return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), 
bundle.getVersion());
+    }
+
+    private ControllerServiceNode addControllerService(final 
VersionedControllerService proposed, final String componentIdSeed) {
+        final String type = proposed.getType();
+        final String id = generateUuid(componentIdSeed);
+
+        final Bundle bundle = proposed.getBundle();
+        final BundleCoordinate coordinate = toCoordinate(bundle);
+        final boolean firstTimeAdded = true;
+        final Set<URL> additionalUrls = Collections.emptySet();
+
+        final ControllerServiceNode newService = 
flowController.createControllerService(type, id, coordinate, additionalUrls, 
firstTimeAdded);
+        newService.setVersionedComponentId(proposed.getIdentifier());
+
+        addControllerService(newService);
+        updateControllerService(newService, proposed);
+
+        return newService;
+    }
+
+    private void updateFunnel(final Funnel funnel, final VersionedFunnel 
proposed) {
+        funnel.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+    }
+
+    private Funnel addFunnel(final VersionedFunnel proposed, final String 
componentIdSeed) {
+        final Funnel funnel = 
flowController.createFunnel(generateUuid(componentIdSeed));
+        funnel.setVersionedComponentId(proposed.getIdentifier());
+        addFunnel(funnel);
+        updateFunnel(funnel, proposed);
+
+        return funnel;
+    }
+
+    private void updatePort(final Port port, final VersionedPort proposed) {
+        port.setComments(proposed.getComments());
+        port.setName(proposed.getName());
+        port.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+    }
+
+    private Port addInputPort(final VersionedPort proposed, final String 
componentIdSeed) {
+        final Port port = 
flowController.createLocalInputPort(generateUuid(componentIdSeed), 
proposed.getName());
+        port.setVersionedComponentId(proposed.getIdentifier());
+        addInputPort(port);
+        updatePort(port, proposed);
+
+        return port;
+    }
+
+    private Port addOutputPort(final VersionedPort proposed, final String 
componentIdSeed) {
+        final Port port = 
flowController.createLocalInputPort(generateUuid(componentIdSeed), 
proposed.getName());
+        port.setVersionedComponentId(proposed.getIdentifier());
+        addOutputPort(port);
+        updatePort(port, proposed);
+
+        return port;
+    }
+
+    private Label addLabel(final VersionedLabel proposed, final String 
componentIdSeed) {
+        final Label label = 
flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel());
+        label.setVersionedComponentId(proposed.getIdentifier());
+        addLabel(label);
+        updateLabel(label, proposed);
+
+        return label;
+    }
+
+    private void updateLabel(final Label label, final VersionedLabel proposed) 
{
+        label.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+        label.setSize(new Size(proposed.getWidth(), proposed.getHeight()));
+        label.setStyle(proposed.getStyle());
+        label.setValue(proposed.getLabel());
+    }
+
+    private ProcessorNode addProcessor(final VersionedProcessor proposed, 
final String componentIdSeed) throws ProcessorInstantiationException {
+        final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
+        final ProcessorNode procNode = 
flowController.createProcessor(proposed.getType(), 
generateUuid(componentIdSeed), coordinate, true);
+        procNode.setVersionedComponentId(proposed.getIdentifier());
+
+        addProcessor(procNode);
+        updateProcessor(procNode, proposed);
+
+        return procNode;
+    }
+
+    private void updateProcessor(final ProcessorNode processor, final 
VersionedProcessor proposed) throws ProcessorInstantiationException {
+        processor.setAnnotationData(proposed.getAnnotationData());
+        
processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
+        processor.setComments(proposed.getComments());
+        
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
+        
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
+        processor.setName(proposed.getName());
+        processor.setPenalizationPeriod(proposed.getPenaltyDuration());
+        
processor.setProperties(populatePropertiesMap(processor.getProperties(), 
proposed.getProperties()));
+        processor.setRunDuration(proposed.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
+        processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
+        
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
+        processor.setStyle(proposed.getStyle());
+        processor.setYieldPeriod(proposed.getYieldDuration());
+        processor.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+
+        
processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
+            .map(relName -> processor.getRelationship(relName))
+            .collect(Collectors.toSet()));
+
+        if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
+            final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
+            final List<PropertyDescriptor> descriptors = new 
ArrayList<>(processor.getProperties().keySet());
+            final Set<URL> additionalUrls = 
processor.getAdditionalClasspathResources(descriptors);
+            flowController.reload(processor, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+        }
+    }
+
+    private Map<String, String> populatePropertiesMap(final 
Map<PropertyDescriptor, String> currentProperties, final Map<String, String> 
proposedProperties) {
+        final Map<String, String> fullPropertyMap = new HashMap<>();
+        for (final PropertyDescriptor property : currentProperties.keySet()) {
+            fullPropertyMap.put(property.getName(), null);
+        }
+
+        fullPropertyMap.putAll(proposedProperties);
+        return fullPropertyMap;
+    }
+
+    private RemoteProcessGroup addRemoteProcessGroup(final 
VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
+        final RemoteProcessGroup rpg = 
flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), 
proposed.getTargetUris());
+        rpg.setVersionedComponentId(proposed.getIdentifier());
+
+        addRemoteProcessGroup(rpg);
+        updateRemoteProcessGroup(rpg, proposed);
+
+        return rpg;
+    }
+
+    private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final 
VersionedRemoteProcessGroup proposed) {
+        rpg.setComments(proposed.getComments());
+        rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
+        rpg.setInputPorts(proposed.getInputPorts().stream()
+            .map(port -> createPortDescriptor(port))
+            .collect(Collectors.toSet()));
+        rpg.setName(proposed.getName());
+        rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
+        rpg.setOutputPorts(proposed.getOutputPorts().stream()
+            .map(port -> createPortDescriptor(port))
+            .collect(Collectors.toSet()));
+        rpg.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+        rpg.setProxyHost(proposed.getProxyHost());
+        rpg.setProxyPort(proposed.getProxyPort());
+        rpg.setProxyUser(proposed.getProxyUser());
+        
rpg.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(proposed.getTransportProtocol()));
+        rpg.setYieldDuration(proposed.getYieldDuration());
+    }
+
+    private RemoteProcessGroupPortDescriptor createPortDescriptor(final 
VersionedRemoteGroupPort proposed) {
+        final StandardRemoteProcessGroupPortDescriptor descriptor = new 
StandardRemoteProcessGroupPortDescriptor();
+        descriptor.setVersionedComponentId(proposed.getIdentifier());
+        descriptor.setBatchCount(proposed.getBatchSize().getCount());
+        descriptor.setBatchDuration(proposed.getBatchSize().getDuration());
+        descriptor.setBatchSize(proposed.getBatchSize().getSize());
+        descriptor.setComments(proposed.getComments());
+        
descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount());
+        descriptor.setGroupId(proposed.getGroupId());
+        descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to 
address this issue of port id's
+        descriptor.setName(proposed.getName());
+        descriptor.setUseCompression(proposed.isUseCompression());
+        return descriptor;
+    }
+
+
+    public Optional<Boolean> isModified() {
+        final StandardVersionControlInformation vci = versionControlInfo.get();
+
+        // If this group is not under version control, then we need to notify 
the parent
+        // group (if any) that a modification has taken place. Otherwise, we 
need to
+        // compare the current the flow with the 'versioned snapshot' of the 
flow in order
+        // to determine if the flows are different.
+        // We cannot simply say 'if something changed then this flow is 
different than the versioned snapshot'
+        // because if we do this, and a user adds a processor then 
subsequently removes it, then the logic would
+        // say that the flow is modified. There would be no way to ever go 
back to the flow not being modified.
+        // So we have to perform a diff of the flows and see if they are the 
same.
+        if (vci == null) {
+            return Optional.of(Boolean.FALSE);
+        }
+
+        if (vci.getFlowSnapshot() == null) {
+            // we haven't retrieved the flow from the Flow Registry yet, so we 
don't know if it's been modified.
+            // As a result, we will just return an empty optional
+            return Optional.empty();
+        }
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
+
+        final ComparableDataFlow currentFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return versionedGroup;
+            }
+
+            @Override
+            public String getName() {
+                return "Local Flow";
+            }
+        };
+
+        final ComparableDataFlow snapshotFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return vci.getFlowSnapshot();
+            }
+
+            @Override
+            public String getName() {
+                return "Flow Under Version Control";
+            }
+        };
+
+        final FlowComparator flowComparator = new 
StandardFlowComparator(currentFlow, snapshotFlow);
+        final FlowComparison comparison = flowComparator.compare();
+        final Set<FlowDifference> differences = comparison.getDifferences();
+        final boolean modified = differences.stream()
+            .filter(diff -> diff.getDifferenceType() != 
DifferenceType.POSITION_CHANGED)
+            .filter(diff -> diff.getDifferenceType() != 
DifferenceType.STYLE_CHANGED)
+            .findAny()
+            .isPresent();
+
+        LOG.debug("There are {} differences between this flow and the 
versioned snapshot of this flow: {}", differences.size(), differences);
+        return Optional.of(modified);
+    }
+
+
+    @Override
+    public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final 
boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+        readLock.lock();
+        try {
+            final VersionControlInformation versionControlInfo = 
getVersionControlInformation();
+            if (versionControlInfo == null) {
+                throw new IllegalStateException("Cannot update the Version of 
the flow for " + this
+                    + " because the Process Group is not currently under 
Version Control");
+            }
+
+            if 
(!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier()))
 {
+                throw new IllegalStateException(this + " is under version 
control but the given flow does not match the flow that this Process Group is 
synchronized with");
+            }
+
+            if (verifyNotDirty) {
+                final Optional<Boolean> modifiedOption = 
versionControlInfo.getModified();
+                if (!modifiedOption.isPresent()) {
+                    throw new IllegalStateException(this + " cannot be updated 
to a different version of the flow because the local flow "
+                        + "has not yet been synchronized with the Flow 
Registry. The Process Group must be"
+                        + " synched with the Flow Registry before continuing. 
This will happen periodically in the background, so please try the request 
again later");
+                }
+
+                if (Boolean.TRUE.equals(modifiedOption.get())) {
+                    throw new IllegalStateException("Cannot change the Version 
of the flow for " + this
+                        + " because the Process Group has been modified since 
it was last synchronized with the Flow Registry. The Process Group must be"
+                        + " restored to its original form before changing the 
version");
+                }
+            }
+
+            final VersionedProcessGroup flowContents = 
updatedFlow.getFlowContents();
+            if (verifyConnectionRemoval) {
+                // Determine which Connections have been removed.
+                final Map<String, Connection> removedConnectionByVersionedId = 
new HashMap<>();
+                findAllConnections().stream()
+                    .filter(conn -> conn.getVersionedComponentId().isPresent())
+                    .forEach(conn -> 
removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn));
+
+                final Set<String> proposedFlowConnectionIds = new HashSet<>();
+                findAllConnectionIds(flowContents, proposedFlowConnectionIds);
+
+                for (final String proposedConnectionId : 
proposedFlowConnectionIds) {
+                    
removedConnectionByVersionedId.remove(proposedConnectionId);
+                }
+
+                // If any connection that was removed has data in it, throw an 
IllegalStateException
+                for (final Connection connection : 
removedConnectionByVersionedId.values()) {
+                    final FlowFileQueue flowFileQueue = 
connection.getFlowFileQueue();
+                    if (!flowFileQueue.isEmpty()) {
+                        throw new IllegalStateException(this + " cannot be 
updated to the proposed version of the flow because the "
+                            + "proposed version does not contain "
+                            + connection + " and the connection currently has 
data in the queue.");
+                    }
+                }
+            }
+
+            // Determine which input ports were removed from this process group
+            final Map<String, Port> removedInputPortsByVersionId = new 
HashMap<>();
+            getInputPorts().stream()
+                .filter(port -> port.getVersionedComponentId().isPresent())
+                .forEach(port -> 
removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
+            flowContents.getInputPorts().stream()
+                .map(VersionedPort::getIdentifier)
+                .forEach(id -> removedInputPortsByVersionId.remove(id));
+
+            // Ensure that there are no incoming connections for any Input 
Port that was removed.
+            for (final Port inputPort : removedInputPortsByVersionId.values()) 
{
+                final List<Connection> incomingConnections = 
inputPort.getIncomingConnections();
+                if (!incomingConnections.isEmpty()) {
+                    throw new IllegalStateException(this + " cannot be updated 
to the proposed version of the flow because the proposed version does not 
contain the Input Port "
+                        + inputPort + " and the Input Port currently has an 
incoming connections");
+                }
+            }
+
+            // Determine which output ports were removed from this process 
group
+            final Map<String, Port> removedOutputPortsByVersionId = new 
HashMap<>();
+            getOutputPorts().stream()
+                .filter(port -> port.getVersionedComponentId().isPresent())
+                .forEach(port -> 
removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
+            flowContents.getOutputPorts().stream()
+                .map(VersionedPort::getIdentifier)
+                .forEach(id -> removedOutputPortsByVersionId.remove(id));
+
+            // Ensure that there are no outgoing connections for any Output 
Port that was removed.
+            for (final Port outputPort : 
removedOutputPortsByVersionId.values()) {
+                final Set<Connection> outgoingConnections = 
outputPort.getConnections();
+                if (!outgoingConnections.isEmpty()) {
+                    throw new IllegalStateException(this + " cannot be updated 
to the proposed version of the flow because the proposed version does not 
contain the Output Port "
+                        + outputPort + " and the Output Port currently has an 
outgoing connections");
+                }
+            }
+
+            // Find any Process Groups that may have been deleted. If we find 
any Process Group that was deleted, and that Process Group
+            // has Templates, then we fail because the Templates have to be 
removed first.
+            final Map<String, VersionedProcessGroup> proposedProcessGroups = 
new HashMap<>();
+            findAllProcessGroups(updatedFlow.getFlowContents(), 
proposedProcessGroups);
+
+            for (final ProcessGroup childGroup : findAllProcessGroups()) {
+                if (childGroup.getTemplates().isEmpty()) {
+                    continue;
+                }
+
+                final Optional<String> versionedIdOption = 
childGroup.getVersionedComponentId();
+                if (!versionedIdOption.isPresent()) {
+                    continue;
+                }
+
+                final String versionedId = versionedIdOption.get();
+                if (!proposedProcessGroups.containsKey(versionedId)) {
+                    // Process Group was removed.
+                    throw new IllegalStateException(this + " cannot be updated 
to the proposed version of the flow because the child " + childGroup
+                        + " that exists locally has one or more Templates, and 
the proposed flow does not contain this Process Group. "
+                        + "A Process Group cannot be deleted while it contains 
Templates. Please remove the Templates before attempting to chnage the version 
of the flow.");
+                }
+            }
+
+
+            // Ensure that all Processors are instantiate-able.
+            final Map<String, VersionedProcessor> proposedProcessors = new 
HashMap<>();
+            findAllProcessors(updatedFlow.getFlowContents(), 
proposedProcessors);
+
+            findAllProcessors().stream()
+                .filter(proc -> proc.getVersionedComponentId().isPresent())
+                .forEach(proc -> 
proposedProcessors.remove(proc.getVersionedComponentId().get()));
+
+            for (final VersionedProcessor processorToAdd : 
proposedProcessors.values()) {
+                final BundleCoordinate coordinate = 
toCoordinate(processorToAdd.getBundle());
+                try {
+                    flowController.createProcessor(processorToAdd.getType(), 
UUID.randomUUID().toString(), coordinate, false);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Unable to create 
Processor of type " + processorToAdd.getType(), e);
+                }
+            }
+
+            // Ensure that all Controller Services are instantiate-able.
+            final Map<String, VersionedControllerService> proposedServices = 
new HashMap<>();
+            findAllControllerServices(updatedFlow.getFlowContents(), 
proposedServices);
+
+            findAllControllerServices().stream()
+                .filter(service -> 
service.getVersionedComponentId().isPresent())
+                .forEach(service -> 
proposedServices.remove(service.getVersionedComponentId().get()));
+
+            for (final VersionedControllerService serviceToAdd : 
proposedServices.values()) {
+                final BundleCoordinate coordinate = 
toCoordinate(serviceToAdd.getBundle());
+                try {
+                    
flowController.createControllerService(serviceToAdd.getType(), 
UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Unable to create 
Controller Service of type " + serviceToAdd.getType(), e);
+                }
+            }
+
+            // Ensure that all Prioritizers are instantiate-able.
+            final Map<String, VersionedConnection> proposedConnections = new 
HashMap<>();
+            findAllConnections(updatedFlow.getFlowContents(), 
proposedConnections);
+
+            findAllConnections().stream()
+                .filter(conn -> conn.getVersionedComponentId().isPresent())
+                .forEach(conn -> 
proposedConnections.remove(conn.getVersionedComponentId().get()));
+
+            for (final VersionedConnection connectionToAdd : 
proposedConnections.values()) {
+                for (final String prioritizerType : 
connectionToAdd.getPrioritizers()) {
+                    try {
+                        flowController.createPrioritizer(prioritizerType);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException("Unable to create 
Prioritizer of type " + prioritizerType, e);
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private void findAllConnectionIds(final VersionedProcessGroup group, final 
Set<String> ids) {
+        for (final VersionedConnection connection : group.getConnections()) {
+            ids.add(connection.getIdentifier());
+        }
+
+        for (final VersionedProcessGroup childGroup : 
group.getProcessGroups()) {
+            findAllConnectionIds(childGroup, ids);
+        }
+    }
+
+    private void findAllProcessors(final VersionedProcessGroup group, final 
Map<String, VersionedProcessor> map) {
+        for (final VersionedProcessor processor : group.getProcessors()) {
+            map.put(processor.getIdentifier(), processor);
+        }
+
+        for (final VersionedProcessGroup childGroup : 
group.getProcessGroups()) {
+            findAllProcessors(childGroup, map);
+        }
+    }
+
+    private void findAllControllerServices(final VersionedProcessGroup group, 
final Map<String, VersionedControllerService> map) {
+        for (final VersionedControllerService service : 
group.getControllerServices()) {
+            map.put(service.getIdentifier(), service);
+        }
+
+        for (final VersionedProcessGroup childGroup : 
group.getProcessGroups()) {
+            findAllControllerServices(childGroup, map);
+        }
+    }
+
+    private void findAllConnections(final VersionedProcessGroup group, final 
Map<String, VersionedConnection> map) {
+        for (final VersionedConnection connection : group.getConnections()) {
+            map.put(connection.getIdentifier(), connection);
+        }
+
+        for (final VersionedProcessGroup childGroup : 
group.getProcessGroups()) {
+            findAllConnections(childGroup, map);
+        }
+    }
+
+    private void findAllProcessGroups(final VersionedProcessGroup group, final 
Map<String, VersionedProcessGroup> map) {
+        map.put(group.getIdentifier(), group);
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            findAllProcessGroups(child, map);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
new file mode 100644
index 0000000..22ba50b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.DefaultPrettyPrinter;
+
+/**
+ * A simple file-based implementation of a Flow Registry Client. Rather than 
interacting
+ * with an actual Flow Registry, this implementation simply reads flows from 
disk and writes
+ * them to disk. It is not meant for any production use but is available for 
testing purposes.
+ */
+public class FileBasedFlowRegistryClient implements FlowRegistryClient, 
FlowRegistry {
+    private final File directory;
+    private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
+    private final JsonFactory jsonFactory = new JsonFactory();
+
+    public FileBasedFlowRegistryClient(final File directory) throws 
IOException {
+        if (!directory.exists() && !directory.mkdirs()) {
+            throw new IOException("Could not access or create directory " + 
directory.getAbsolutePath() + " for Flow Registry");
+        }
+
+        this.directory = directory;
+        recoverBuckets();
+    }
+
+    private void recoverBuckets() throws IOException {
+        final File[] bucketDirs = directory.listFiles();
+        if (bucketDirs == null) {
+            throw new IOException("Could not get listing of directory " + 
directory);
+        }
+
+        for (final File bucketDir : bucketDirs) {
+            final File[] flowDirs = bucketDir.listFiles();
+            if (flowDirs == null) {
+                throw new IOException("Could not get listing of directory " + 
bucketDir);
+            }
+
+            final Set<String> flowNames = new HashSet<>();
+            for (final File flowDir : flowDirs) {
+                final File propsFile = new File(flowDir, "flow.properties");
+                if (!propsFile.exists()) {
+                    continue;
+                }
+
+                final Properties properties = new Properties();
+                try (final InputStream in = new FileInputStream(propsFile)) {
+                    properties.load(in);
+                }
+
+                final String flowName = properties.getProperty("name");
+                if (flowName == null) {
+                    continue;
+                }
+
+                flowNames.add(flowName);
+            }
+
+            if (!flowNames.isEmpty()) {
+                flowNamesByBucket.put(bucketDir.getName(), flowNames);
+            }
+        }
+    }
+
+    @Override
+    public FlowRegistry getFlowRegistry(final String registryId) {
+        if (!"default".equals(registryId)) {
+            return null;
+        }
+
+        return this;
+    }
+
+    @Override
+    public String getURL() {
+        return directory.toURI().toString();
+    }
+
+    @Override
+    public synchronized VersionedFlow registerVersionedFlow(final 
VersionedFlow flow) throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+
+        // Verify that bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + 
flow.getBucketIdentifier());
+        }
+
+        // Verify that there is no flow with the same name in that bucket
+        final Set<String> flowNames = 
flowNamesByBucket.get(flow.getBucketIdentifier());
+        if (flowNames != null && flowNames.contains(flow.getName())) {
+            throw new IllegalArgumentException("Flow with name '" + 
flow.getName() + "' already exists for Bucket with ID " + 
flow.getBucketIdentifier());
+        }
+
+        final String flowIdentifier = UUID.randomUUID().toString();
+        final File flowDir = new File(bucketDir, flowIdentifier);
+        if (!flowDir.mkdirs()) {
+            throw new IOException("Failed to create directory " + flowDir + " 
for new Flow");
+        }
+
+        final File propertiesFile = new File(flowDir, "flow.properties");
+
+        final Properties flowProperties = new Properties();
+        flowProperties.setProperty("name", flow.getName());
+        flowProperties.setProperty("created", 
String.valueOf(flow.getCreatedTimestamp()));
+        flowProperties.setProperty("description", flow.getDescription());
+        flowProperties.setProperty("lastModified", 
String.valueOf(flow.getModifiedTimestamp()));
+
+        try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+            flowProperties.store(out, null);
+        }
+
+        final VersionedFlow response = new VersionedFlow();
+        response.setBucketIdentifier(flow.getBucketIdentifier());
+        response.setCreatedTimestamp(flow.getCreatedTimestamp());
+        response.setDescription(flow.getDescription());
+        response.setIdentifier(flowIdentifier);
+        response.setModifiedTimestamp(flow.getModifiedTimestamp());
+        response.setName(flow.getName());
+
+        return response;
+    }
+
+    @Override
+    public synchronized VersionedFlowSnapshot 
registerVersionedFlowSnapshot(final VersionedFlow flow, final 
VersionedProcessGroup snapshot, final String comments)
+            throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+        Objects.requireNonNull(snapshot);
+
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + 
flow.getBucketIdentifier());
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flow.getIdentifier());
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + 
flow.getIdentifier() + " exists for Bucket with ID " + 
flow.getBucketIdentifier());
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + 
flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        final int snapshotVersion = maxVersion + 1;
+        final File snapshotDir = new File(flowDir, 
String.valueOf(snapshotVersion));
+        if (!snapshotDir.mkdir()) {
+            throw new IOException("Could not create directory " + snapshotDir);
+        }
+
+        final File contentsFile = new File(snapshotDir, "flow.xml");
+
+        try (final OutputStream out = new FileOutputStream(contentsFile);
+            final JsonGenerator generator = 
jsonFactory.createJsonGenerator(out)) {
+            generator.setCodec(new ObjectMapper());
+            generator.setPrettyPrinter(new DefaultPrettyPrinter());
+            generator.writeObject(snapshot);
+        }
+
+        final Properties snapshotProperties = new Properties();
+        snapshotProperties.setProperty("comments", comments);
+        snapshotProperties.setProperty("name", flow.getName());
+        final File snapshotPropsFile = new File(snapshotDir, 
"snapshot.properties");
+        try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) 
{
+            snapshotProperties.store(out, null);
+        }
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+        snapshotMetadata.setFlowName(flow.getName());
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(snapshotVersion);
+
+        final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
+        response.setSnapshotMetadata(snapshotMetadata);
+        response.setFlowContents(snapshot);
+        return response;
+    }
+
+    @Override
+    public Set<String> getRegistryIdentifiers() {
+        return Collections.singleton("default");
+    }
+
+    @Override
+    public int getLatestVersion(final String bucketId, final String flowId) 
throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + 
bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " 
exists for Bucket with ID " + bucketId);
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + 
flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        return maxVersion;
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final 
String flowId, int version) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + 
bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " 
exists for Bucket with ID " + flowId);
+        }
+
+        final File versionDir = new File(flowDir, String.valueOf(version));
+        if (!versionDir.exists()) {
+            throw new UnknownResourceException("Flow with ID " + flowId + " in 
Bucket with ID " + bucketId + " does not contain a snapshot with version " + 
version);
+        }
+
+        final File contentsFile = new File(versionDir, "flow.xml");
+
+        final VersionedProcessGroup processGroup;
+        try (final JsonParser parser = 
jsonFactory.createJsonParser(contentsFile)) {
+            final ObjectMapper mapper = new ObjectMapper();
+            
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+            parser.setCodec(mapper);
+            processGroup = parser.readValueAs(VersionedProcessGroup.class);
+        }
+
+        final Properties properties = new Properties();
+        final File snapshotPropsFile = new File(versionDir, 
"snapshot.properties");
+        try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+            properties.load(in);
+        }
+
+        final String comments = properties.getProperty("comments");
+        final String flowName = properties.getProperty("name");
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucketId);
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flowId);
+        snapshotMetadata.setFlowName(flowName);
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(version);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(processGroup);
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+
+        return snapshot;
+    }
+
+    @Override
+    public VersionedFlow getVersionedFlow(final String bucketId, final String 
flowId) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + 
bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " 
exists for Bucket with ID " + flowId);
+        }
+
+        final File flowPropsFile = new File(flowDir, "flow.properties");
+        final Properties flowProperties = new Properties();
+        try (final InputStream in = new FileInputStream(flowPropsFile)) {
+            flowProperties.load(in);
+        }
+
+        final VersionedFlow flow = new VersionedFlow();
+        flow.setBucketIdentifier(bucketId);
+        
flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
+        flow.setDescription(flowProperties.getProperty("description"));
+        flow.setIdentifier(flowId);
+        flow.setModifiedTimestamp(flowDir.lastModified());
+        flow.setName(flowProperties.getProperty("name"));
+
+        final Comparator<VersionedFlowSnapshotMetadata> versionComparator = 
(a, b) -> Integer.compare(a.getVersion(), b.getVersion());
+
+        final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = 
new TreeSet<>(versionComparator);
+        flow.setSnapshotMetadata(snapshotMetadataSet);
+
+        final File[] versionDirs = flowDir.listFiles();
+        for (final File file : versionDirs) {
+            if (!file.isDirectory()) {
+                continue;
+            }
+
+            int version;
+            try {
+                version = Integer.parseInt(file.getName());
+            } catch (final NumberFormatException nfe) {
+                // not a version. skip.
+                continue;
+            }
+
+            final File snapshotPropsFile = new File(file, 
"snapshot.properties");
+            final Properties snapshotProperties = new Properties();
+            try (final InputStream in = new 
FileInputStream(snapshotPropsFile)) {
+                snapshotProperties.load(in);
+            }
+
+            final VersionedFlowSnapshotMetadata metadata = new 
VersionedFlowSnapshotMetadata();
+            metadata.setBucketIdentifier(bucketId);
+            metadata.setComments(snapshotProperties.getProperty("comments"));
+            metadata.setFlowIdentifier(flowId);
+            metadata.setFlowName(snapshotProperties.getProperty("name"));
+            metadata.setTimestamp(file.lastModified());
+            metadata.setVersion(version);
+
+            snapshotMetadataSet.add(metadata);
+        }
+
+        return flow;
+    }
+}

Reply via email to