NIFI-4436: More intelligently flag a ProcessGroup to indicate whether or not it 
has any local modifications compared to Versioned Flow - Bug fixes - Updated to 
include status of a Versioned Process Group to include VersionedFlowState and 
explanation

Signed-off-by: Matt Gilman <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fdef5b56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fdef5b56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fdef5b56

Branch: refs/heads/master
Commit: fdef5b560544a8da33068b4acb6d4404fe193ed9
Parents: d34fb5e
Author: Mark Payne <[email protected]>
Authored: Tue Nov 28 12:33:00 2017 -0500
Committer: Bryan Bende <[email protected]>
Committed: Mon Jan 8 12:44:54 2018 -0500

----------------------------------------------------------------------
 .../api/dto/VersionControlInformationDTO.java   |  22 ++
 .../service/ControllerServiceNode.java          |   3 +-
 .../org/apache/nifi/groups/ProcessGroup.java    |  13 +-
 .../apache/nifi/groups/RemoteProcessGroup.java  |   9 +-
 .../flow/VersionControlInformation.java         |   5 +
 .../nifi/registry/flow/VersionedFlowState.java  |  52 +++
 .../nifi/registry/flow/VersionedFlowStatus.java |  31 ++
 .../apache/nifi/controller/FlowController.java  |  76 ++--
 .../controller/StandardFlowSynchronizer.java    |   5 +-
 .../nifi/groups/StandardProcessGroup.java       | 382 +++++++++++++++++--
 .../groups/StandardVersionedFlowStatus.java     |  50 +++
 .../flow/StandardVersionControlInformation.java |  17 +-
 .../flow/mapping/NiFiRegistryDtoMapper.java     | 328 ----------------
 .../flow/mapping/NiFiRegistryFlowMapper.java    |  97 +++--
 .../nifi/remote/StandardRemoteProcessGroup.java | 126 ++----
 .../service/mock/MockProcessGroup.java          |   6 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  31 +-
 .../nifi/web/api/ProcessGroupResource.java      |   2 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  29 +-
 .../nifi/web/controller/ControllerFacade.java   |   5 +
 .../dao/impl/StandardControllerServiceDAO.java  |  20 +-
 .../nifi/web/dao/impl/StandardFunnelDAO.java    |   2 +
 .../nifi/web/dao/impl/StandardInputPortDAO.java |   1 +
 .../nifi/web/dao/impl/StandardLabelDAO.java     |   1 +
 .../web/dao/impl/StandardOutputPortDAO.java     |   1 +
 .../web/dao/impl/StandardProcessGroupDAO.java   |  13 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |   1 +
 .../dao/impl/StandardRemoteProcessGroupDAO.java |   5 +-
 .../nifi/web/util/AffectedComponentUtils.java   |   4 +
 .../ClusterReplicationComponentLifecycle.java   |   6 +-
 30 files changed, 751 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
index c31a957..944b10a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
@@ -34,6 +34,8 @@ public class VersionControlInformationDTO {
     private Integer version;
     private Boolean modified;
     private Boolean current;
+    private String state;
+    private String stateExplanation;
 
     @ApiModelProperty("The ID of the Process Group that is under version 
control")
     public String getGroupId() {
@@ -135,4 +137,24 @@ public class VersionControlInformationDTO {
     public void setCurrent(Boolean current) {
         this.current = current;
     }
+
+    @ApiModelProperty(readOnly = true,
+        value = "The current state of the Process Group, as it relates to the 
Versioned Flow",
+        allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, 
STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
+    public String getState() {
+        return state;
+    }
+
+    public void setState(final String state) {
+        this.state = state;
+    }
+
+    @ApiModelProperty(readOnly = true, value = "Explanation of why the group 
is in the specified state")
+    public String getStateExplanation() {
+        return stateExplanation;
+    }
+
+    public void setStateExplanation(String explanation) {
+        this.stateExplanation = explanation;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 2f28963..2219d6d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
+import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
@@ -27,7 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
-public interface ControllerServiceNode extends ConfiguredComponent, 
VersionedComponent {
+public interface ControllerServiceNode extends ConfiguredComponent, 
ConfigurableComponent, VersionedComponent {
 
     /**
      * @return the Process Group that this Controller Service belongs to, or 
<code>null</code> if the Controller Service

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index d81b7d3..17131dd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -462,11 +462,11 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
 
     /**
      * @param id of the Controller Service
-     * @return the Controller Service with the given ID, if it exists as a 
child or
-     *         descendant of this ProcessGroup. This performs a recursive 
search of all
-     *         descendant ProcessGroups
+     * @param includeDescendantGroups whether or not to include descendant 
process groups
+     * @param includeAncestorGroups whether or not to include ancestor process 
groups
+     * @return the Controller Service with the given ID
      */
-    ControllerServiceNode findControllerService(String id);
+    ControllerServiceNode findControllerService(String id, boolean 
includeDescendantGroups, boolean includeAncestorGroups);
 
     /**
      * @return a List of all Controller Services contained within this 
ProcessGroup and any child Process Groups
@@ -976,4 +976,9 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
      * @param flowRegistry the Flow Registry to synchronize with
      */
     void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry);
+
+    /**
+     * Called whenever a component within this group or the group itself is 
modified
+     */
+    void onComponentModified();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 0dd6070..7d92246 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -61,9 +61,9 @@ public interface RemoteProcessGroup extends 
ComponentAuthorizable, Positionable,
 
     void setName(String name);
 
-    void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+    void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean 
pruneUnusedPorts);
 
-    void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+    void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean 
pruneUnusedPorts);
 
     Set<RemoteGroupPort> getInputPorts();
 
@@ -216,11 +216,6 @@ public interface RemoteProcessGroup extends 
ComponentAuthorizable, Positionable,
     void reinitialize(boolean isClustered);
 
     /**
-     * Removes all non existent ports from this RemoteProcessGroup.
-     */
-    void removeAllNonExistentPorts();
-
-    /**
      * Removes a port that no longer exists on the remote instance from this
      * RemoteProcessGroup
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
index b54a1c9..1f65a19 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
@@ -77,6 +77,11 @@ public interface VersionControlInformation {
     boolean isCurrent();
 
     /**
+     * @return the current status of the Process Group as it relates to the 
associated Versioned Flow.
+     */
+    VersionedFlowStatus getStatus();
+
+    /**
      * @return the snapshot of the flow that was synchronized with the Flow 
Registry
      */
     VersionedProcessGroup getFlowSnapshot();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
new file mode 100644
index 0000000..d20a13f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public enum VersionedFlowState {
+
+    /**
+     * We are unable to communicate with the Flow Registry in order to 
determine the appropriate state
+     */
+    SYNC_FAILURE,
+
+    /**
+     * This Process Group (or a child/descendant Process Group that is not 
itself under Version Control)
+     * is on the latest version of the Versioned Flow, but is different than 
the Versioned Flow that is
+     * stored in the Flow Registry.
+     */
+    LOCALLY_MODIFIED,
+
+    /**
+     * This Process Group has not been modified since it was last synchronized 
with the Flow Registry, but
+     * the Flow Registry has a newer version of the flow than what is 
contained in this Process Group.
+     */
+    STALE,
+
+    /**
+     * This Process Group (or a child/descendant Process Group that is not 
itself under Version Control)
+     * has been modified since it was last synchronized with the Flow 
Registry, and the Flow Registry has
+     * a newer version of the flow than what is contained in this Process 
Group.
+     */
+    LOCALLY_MODIFIED_AND_STALE,
+
+    /**
+     * This Process Group and all child/descendant Process Groups are on the 
latest version of the flow in
+     * the Flow Registry and have no local modifications.
+     */
+    UP_TO_DATE;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
new file mode 100644
index 0000000..9b58d9a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public interface VersionedFlowStatus {
+
+    /**
+     * @return the current state of the versioned process group
+     */
+    VersionedFlowState getState();
+
+    /**
+     * @return an explanation of why the process group is in the state that it 
is in.
+     */
+    String getStateExplanation();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3909387..2afa9dc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
  */
 package org.apache.nifi.controller;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -169,7 +202,6 @@ import 
org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -225,38 +257,6 @@ import 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider,
     QueueProvider, Authorizable, ProvenanceAuthorizableFactory, 
NodeTypeProvider, IdentifierLookup, ReloadComponent {
 
@@ -1983,14 +1983,14 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 if (remoteGroupDTO.getContents() != null) {
                     final RemoteProcessGroupContentsDTO contents = 
remoteGroupDTO.getContents();
 
-                    // ensure there input ports
+                    // ensure there are input ports
                     if (contents.getInputPorts() != null) {
-                        
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
+                        
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
                     }
 
                     // ensure there are output ports
                     if (contents.getOutputPorts() != null) {
-                        
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
+                        
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
                     }
                 }
 
@@ -2035,12 +2035,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 instantiateSnippet(childGroup, childTemplateDTO, false);
 
                 if (groupDTO.getVersionControlInformation() != null) {
-                    final NiFiRegistryFlowMapper flowMapper = new 
NiFiRegistryFlowMapper();
-                    final VersionedProcessGroup versionedGroup = 
flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false);
-
                     final VersionControlInformation vci = 
StandardVersionControlInformation.Builder
                         .fromDto(groupDTO.getVersionControlInformation())
-                        .flowSnapshot(versionedGroup)
                         .build();
                     childGroup.setVersionControlInformation(vci, 
Collections.emptyMap());
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 71a587c..28d9b79 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -931,6 +931,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             final Label label = controller.createLabel(labelDTO.getId(), 
labelDTO.getLabel());
             label.setStyle(labelDTO.getStyle());
             label.setPosition(new Position(labelDTO.getPosition().getX(), 
labelDTO.getPosition().getY()));
+            label.setVersionedComponentId(labelDTO.getVersionedComponentId());
             if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
                 label.setSize(new Size(labelDTO.getWidth(), 
labelDTO.getHeight()));
             }
@@ -1327,13 +1328,13 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             for (final Element portElement : 
getChildrenByTagName(remoteProcessGroupElement, "inputPort")) {
                 
inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
             }
-            remoteGroup.setInputPorts(inputPorts);
+            remoteGroup.setInputPorts(inputPorts, false);
 
             final Set<RemoteProcessGroupPortDescriptor> outputPorts = new 
HashSet<>();
             for (final Element portElement : 
getChildrenByTagName(remoteProcessGroupElement, "outputPort")) {
                 
outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
             }
-            remoteGroup.setOutputPorts(outputPorts);
+            remoteGroup.setOutputPorts(outputPorts, false);
             processGroup.addRemoteProcessGroup(remoteGroup);
 
             for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : 
outputPorts) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 9a14464..4b186a9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -83,11 +83,14 @@ import 
org.apache.nifi.registry.flow.VersionedControllerService;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
 import org.apache.nifi.registry.flow.VersionedFunnel;
 import org.apache.nifi.registry.flow.VersionedLabel;
 import org.apache.nifi.registry.flow.VersionedPort;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
 import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
 import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
 import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
@@ -166,6 +169,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     private final Map<String, Template> templates = new HashMap<>();
     private final StringEncryptor encryptor;
     private final MutableVariableRegistry variableRegistry;
+    private final AtomicReference<StandardVersionedFlowStatus> flowStatus = 
new AtomicReference<>(
+        new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow 
Registry", null));
 
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -494,6 +499,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             port.setProcessGroup(this);
             inputPorts.put(requireNonNull(port).getIdentifier(), port);
             flowController.onInputPortAdded(port);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -528,6 +534,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException(port.getIdentifier() + " is 
not an Input Port of this Process Group");
             }
 
+            onComponentModified();
+
             flowController.onInputPortRemoved(port);
             LOG.info("Input Port {} removed from flow", port);
         } finally {
@@ -575,6 +583,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             port.setProcessGroup(this);
             outputPorts.put(port.getIdentifier(), port);
             flowController.onOutputPortAdded(port);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -600,6 +609,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException(port.getIdentifier() + " is 
not an Output Port of this Process Group");
             }
 
+            onComponentModified();
+
             flowController.onOutputPortRemoved(port);
             LOG.info("Output Port {} removed from flow", port);
         } finally {
@@ -640,6 +651,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             processGroups.put(Objects.requireNonNull(group).getIdentifier(), 
group);
             flowController.onProcessGroupAdded(group);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -679,6 +691,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             removeComponents(group);
             processGroups.remove(group.getIdentifier());
+            onComponentModified();
+
             flowController.onProcessGroupRemoved(group);
             LOG.info("{} removed from flow", group);
         } finally {
@@ -734,6 +748,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             remoteGroup.setProcessGroup(this);
             
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), 
remoteGroup);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -767,6 +782,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
             }
 
+            onComponentModified();
+
             for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
                 // must copy to avoid a concurrent modification
                 final Set<Connection> copy = new 
HashSet<>(port.getConnections());
@@ -802,6 +819,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
             flowController.onProcessorAdded(processor);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -843,6 +861,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             processors.remove(id);
+            onComponentModified();
+
             flowController.onProcessorRemoved(processor);
             
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
 
@@ -912,6 +932,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         writeLock.lock();
         try {
             connections.put(connection.getIdentifier(), connection);
+            onComponentModified();
             connection.setProcessGroup(this);
         } finally {
             writeLock.unlock();
@@ -983,6 +1004,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
             connections.put(connection.getIdentifier(), connection);
             flowController.onConnectionAdded(connection);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -1042,6 +1064,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             // remove the connection from our map
             connections.remove(connection.getIdentifier());
             LOG.info("{} removed from flow", connection);
+            onComponentModified();
+
             flowController.onConnectionRemoved(connection);
         } finally {
             writeLock.unlock();
@@ -1109,6 +1133,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             label.setProcessGroup(this);
             labels.put(label.getIdentifier(), label);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -1123,6 +1148,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException(label + " is not a member of 
this Process Group.");
             }
 
+            onComponentModified();
             LOG.info("Label with ID {} removed from flow", 
label.getIdentifier());
         } finally {
             writeLock.unlock();
@@ -1828,6 +1854,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             if (autoStart) {
                 startFunnel(funnel);
             }
+
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -1859,18 +1887,43 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
 
     @Override
-    public ControllerServiceNode findControllerService(final String id) {
-        return findControllerService(id, this);
+    public ControllerServiceNode findControllerService(final String id, final 
boolean includeDescendants, final boolean includeAncestors) {
+        ControllerServiceNode serviceNode;
+        if (includeDescendants) {
+            serviceNode = findDescendantControllerService(id, this);
+        } else {
+            serviceNode = getControllerService(id);
+        }
+
+        if (serviceNode == null && includeAncestors) {
+            serviceNode = findAncestorControllerService(id, getParent());
+        }
+
+        return serviceNode;
+    }
+
+    private ControllerServiceNode findAncestorControllerService(final String 
id, final ProcessGroup start) {
+        if (start == null) {
+            return null;
+        }
+
+        final ControllerServiceNode serviceNode = 
start.getControllerService(id);
+        if (serviceNode != null) {
+            return serviceNode;
+        }
+
+        final ProcessGroup parent = start.getParent();
+        return findAncestorControllerService(id, parent);
     }
 
-    private ControllerServiceNode findControllerService(final String id, final 
ProcessGroup start) {
+    private ControllerServiceNode findDescendantControllerService(final String 
id, final ProcessGroup start) {
         ControllerServiceNode service = start.getControllerService(id);
         if (service != null) {
             return service;
         }
 
         for (final ProcessGroup group : start.getProcessGroups()) {
-            service = findControllerService(id, group);
+            service = findDescendantControllerService(id, group);
             if (service != null) {
                 return service;
             }
@@ -1916,6 +1969,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             funnels.remove(funnel.getIdentifier());
+            onComponentModified();
+
             flowController.onFunnelRemoved(funnel);
             LOG.info("{} removed from flow", funnel);
         } finally {
@@ -1947,6 +2002,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             service.getVariableRegistry().setParent(getVariableRegistry());
             this.controllerServices.put(service.getIdentifier(), service);
             LOG.info("{} added to {}", service, this);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -2010,6 +2066,21 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             controllerServices.remove(service.getIdentifier());
+            onComponentModified();
+
+            // For any component that references this Controller Service, find 
the component's Process Group
+            // and notify the Process Group that a component has been 
modified. This way, we know to re-calculate
+            // whether or not the Process Group has local modifications.
+            service.getReferences().getReferencingComponents().stream()
+                .map(ConfiguredComponent::getProcessGroupIdentifier)
+                .filter(id -> !id.equals(getIdentifier()))
+                .forEach(groupId -> {
+                    final ProcessGroup descendant = findProcessGroup(groupId);
+                    if (descendant != null) {
+                        descendant.onComponentModified();
+                    }
+                });
+
             
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
 
             removed = true;
@@ -2043,6 +2114,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             templates.put(id, template);
             template.setProcessGroup(this);
             LOG.info("{} added to {}", template, this);
+            onComponentModified();
         } finally {
             writeLock.unlock();
         }
@@ -2112,6 +2184,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             templates.remove(template.getIdentifier());
+            onComponentModified();
+
             LOG.info("{} removed from flow", template);
         } finally {
             writeLock.unlock();
@@ -2172,6 +2246,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 toRemove.verifyCanDelete(true);
             }
 
+            onComponentModified();
+
             for (final String id : connectionIdsToRemove) {
                 removeConnection(connections.get(id));
             }
@@ -2224,6 +2300,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 throw new IllegalStateException("Cannot move Ports into the 
root group");
             }
 
+            onComponentModified();
+
             for (final String id : getKeys(snippet.getInputPorts())) {
                 destination.addInputPort(inputPorts.remove(id));
             }
@@ -2845,6 +2923,34 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     @Override
+    public void onComponentModified() {
+        // We no longer know if or how the Process Group has changed, so the 
next time that we
+        // get the local modifications, we must re-calculate it. We cannot 
simply assume that
+        // the flow was modified now, because if a Processor Property changed 
from 'A' to 'B',
+        // then back to 'A', then we have to know that it was not modified. So 
we set it to null
+        // to indicate that we must calculate the local modifications.
+        final StandardVersionControlInformation svci = 
this.versionControlInfo.get();
+        if (svci == null) {
+            // This group is not under version control directly. Notify parent.
+            final ProcessGroup parentGroup = parent.get();
+            if (parentGroup != null) {
+                parentGroup.onComponentModified();
+            }
+        }
+
+        clearFlowDifferences();
+    }
+
+    private void clearFlowDifferences() {
+        boolean updated = false;
+        while (!updated) {
+            final StandardVersionedFlowStatus status = flowStatus.get();
+            final StandardVersionedFlowStatus updatedStatus = new 
StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), 
null);
+            updated = flowStatus.compareAndSet(status, updatedStatus);
+        }
+    }
+
+    @Override
     public void setVersionControlInformation(final VersionControlInformation 
versionControlInformation, final Map<String, String> versionedComponentIds) {
         final StandardVersionControlInformation svci = new 
StandardVersionControlInformation(
             versionControlInformation.getRegistryIdentifier(),
@@ -2854,16 +2960,63 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             versionControlInformation.getVersion(),
             
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(),
 true),
             versionControlInformation.isModified(),
-            versionControlInformation.isCurrent()) {
+            versionControlInformation.isCurrent(),
+            versionControlInformation.getStatus()) {
 
             @Override
             public boolean isModified() {
-                final Set<FlowDifference> differences = 
StandardProcessGroup.this.getModifications();
-                if (differences == null) {
-                    return false;
+                boolean updated = false;
+                while (true) {
+                    final StandardVersionedFlowStatus status = 
flowStatus.get();
+                    Set<FlowDifference> differences = 
status.getCurrentDifferences();
+                    if (differences == null) {
+                        differences = getModifications();
+                        if (differences == null) {
+                            return false;
+                        }
+
+                        final StandardVersionedFlowStatus updatedStatus = new 
StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), 
differences);
+                        updated = flowStatus.compareAndSet(status, 
updatedStatus);
+
+                        if (updated) {
+                            return !differences.isEmpty();
+                        }
+
+                        continue;
+                    }
+
+                    return !differences.isEmpty();
+                }
+            }
+
+            @Override
+            public VersionedFlowStatus getStatus() {
+                // If current state is a sync failure, then
+                final StandardVersionedFlowStatus status = flowStatus.get();
+                final VersionedFlowState state = status.getState();
+                if (state == VersionedFlowState.SYNC_FAILURE) {
+                    return status;
                 }
 
-                return !differences.isEmpty();
+                final boolean modified = isModified();
+                if (!modified) {
+                    final VersionControlInformation vci = 
StandardProcessGroup.this.versionControlInfo.get();
+                    if (vci.getFlowSnapshot() == null) {
+                        return new 
StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has 
not yet been synchronized with Flow Registry", null);
+                    }
+                }
+
+                final boolean stale = !isCurrent();
+
+                if (modified && stale) {
+                    return new 
StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, 
"Local changes have been made and a newer version of this flow is available", 
null);
+                } else if (modified) {
+                    return new 
StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes 
have been made", null);
+                } else if (stale) {
+                    return new 
StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this 
flow is available", null);
+                } else {
+                    return new 
StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is 
current", null);
+                }
             }
         };
 
@@ -2875,6 +3028,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         try {
             updateVersionedComponentIds(this, versionedComponentIds);
             this.versionControlInfo.set(svci);
+            clearFlowDifferences();
         } finally {
             writeLock.unlock();
         }
@@ -2901,6 +3055,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         copy.setProcessors(processGroup.getProcessors());
         copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
         copy.setVariables(processGroup.getVariables());
+        copy.setLabels(processGroup.getLabels());
 
         final Set<VersionedProcessGroup> copyChildren = new HashSet<>();
 
@@ -2944,8 +3099,22 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
 
         applyVersionedComponentIds(processGroup, versionedComponentIds::get);
+
+        // If we versioned any parent groups' Controller Services, set their 
versioned component id's too.
+        final ProcessGroup parent = processGroup.getParent();
+        if (parent != null) {
+            for (final ControllerServiceNode service : 
parent.getControllerServices(true)) {
+                if (!service.getVersionedComponentId().isPresent()) {
+                    final String versionedId = 
versionedComponentIds.get(service.getIdentifier());
+                    if (versionedId != null) {
+                        service.setVersionedComponentId(versionedId);
+                    }
+                }
+            }
+        }
     }
 
+
     private void applyVersionedComponentIds(final ProcessGroup processGroup, 
final Function<String, String> lookup) {
         
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
 
@@ -2980,6 +3149,14 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             .forEach(childGroup -> applyVersionedComponentIds(childGroup, 
lookup));
     }
 
+    private void setSyncFailedState(final String explanation) {
+        boolean updated = false;
+        while (!updated) {
+            final StandardVersionedFlowStatus status = flowStatus.get();
+            final StandardVersionedFlowStatus updatedStatus = new 
StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, 
status.getCurrentDifferences());
+            updated = flowStatus.compareAndSet(status, updatedStatus);
+        }
+    }
 
     @Override
     public void synchronizeWithFlowRegistry(final FlowRegistryClient 
flowRegistryClient) {
@@ -2991,6 +3168,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         final String registryId = vci.getRegistryIdentifier();
         final FlowRegistry flowRegistry = 
flowRegistryClient.getFlowRegistry(registryId);
         if (flowRegistry == null) {
+            final String message = String.format("Unable to synchronize 
Process Group with Flow Registry because Process Group was placed under Version 
Control using Flow Registry "
+                + "with identifier %s but cannot find any Flow Registry with 
this identifier", registryId);
+            setSyncFailedState(message);
+
             LOG.error("Unable to synchronize {} with Flow Registry because 
Process Group was placed under Version Control using Flow Registry "
                 + "with identifier {} but cannot find any Flow Registry with 
this identifier", this, registryId);
             return;
@@ -3005,8 +3186,12 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 final VersionedProcessGroup registryFlow = 
registrySnapshot.getFlowContents();
                 vci.setFlowSnapshot(registryFlow);
             } catch (final IOException | NiFiRegistryException e) {
+                final String message = String.format("Failed to synchronize 
Process Group with Flow Registry because could not retrieve version %s of flow 
with identifier %s in bucket %s",
+                    vci.getVersion(), vci.getFlowIdentifier(), 
vci.getBucketIdentifier());
+                setSyncFailedState(message);
+
                 LOG.error("Failed to synchronize {} with Flow Registry because 
could not retrieve version {} of flow with identifier {} in bucket {}",
-                    new Object[] {this, vci.getVersion(), 
vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e);
+                    this, vci.getVersion(), vci.getFlowIdentifier(), 
vci.getBucketIdentifier(), e);
                 return;
             }
         }
@@ -3027,7 +3212,17 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 LOG.info("{} is not the most recent version of the flow that 
is under Version Control; current version is {}; most recent version is {}",
                     new Object[] {this, vci.getVersion(), latestVersion});
             }
+
+            boolean updated = false;
+            while (!updated) {
+                final StandardVersionedFlowStatus status = flowStatus.get();
+                final StandardVersionedFlowStatus updatedStatus = new 
StandardVersionedFlowStatus(null, null, status.getCurrentDifferences());
+                updated = flowStatus.compareAndSet(status, updatedStatus);
+            }
         } catch (final IOException | NiFiRegistryException e) {
+            final String message = String.format("Failed to synchronize 
Process Group with Flow Registry because could not determine the most recent 
version of the Flow in the Flow Registry");
+            setSyncFailedState(message);
+
             LOG.error("Failed to synchronize {} with Flow Registry because 
could not determine the most recent version of the Flow in the Flow Registry", 
this, e);
         }
     }
@@ -3041,12 +3236,12 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
 
             final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-            final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true);
+            final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, controllerServiceProvider, 
flowController.getFlowRegistryClient(), true);
 
             final ComparableDataFlow localFlow = new 
StandardComparableDataFlow("Local Flow", versionedGroup);
             final ComparableDataFlow remoteFlow = new 
StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
 
-            final FlowComparator flowComparator = new 
StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor());
+            final FlowComparator flowComparator = new 
StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new 
StaticDifferenceDescriptor());
             final FlowComparison flowComparison = flowComparator.compare();
 
             final Set<String> updatedVersionedComponentIds = new HashSet<>();
@@ -3055,6 +3250,25 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                     continue;
                 }
 
+                // If this update adds a new Controller Service, then we need 
to check if the service already exists at a higher level
+                // and if so compare our VersionedControllerService to the 
existing service.
+                if (diff.getDifferenceType() == 
DifferenceType.COMPONENT_ADDED) {
+                    final VersionedComponent component = diff.getComponentA() 
== null ? diff.getComponentB() : diff.getComponentA();
+                    if (ComponentType.CONTROLLER_SERVICE == 
component.getComponentType()) {
+                        final ControllerServiceNode serviceNode = 
getVersionedControllerService(this, component.getIdentifier());
+                        if (serviceNode != null) {
+                            final VersionedControllerService versionedService 
= mapper.mapControllerService(serviceNode, controllerServiceProvider);
+                            final Set<FlowDifference> differences = 
flowComparator.compareControllerServices(versionedService, 
(VersionedControllerService) component);
+
+                            if (!differences.isEmpty()) {
+                                
updatedVersionedComponentIds.add(component.getIdentifier());
+                            }
+
+                            continue;
+                        }
+                    }
+                }
+
                 final VersionedComponent component = diff.getComponentA() == 
null ? diff.getComponentB() : diff.getComponentA();
                 updatedVersionedComponentIds.add(component.getIdentifier());
 
@@ -3081,6 +3295,35 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    private Set<String> getAncestorGroupServiceIds() {
+        final Set<String> ancestorServiceIds;
+        ProcessGroup parentGroup = getParent();
+
+        if (parentGroup == null) {
+            ancestorServiceIds = Collections.emptySet();
+        } else {
+            ancestorServiceIds = 
parentGroup.getControllerServices(true).stream()
+                .map(ControllerServiceNode::getIdentifier)
+                .collect(Collectors.toSet());
+        }
+
+        return ancestorServiceIds;
+    }
+
+    private ControllerServiceNode getVersionedControllerService(final 
ProcessGroup group, final String versionedComponentId) {
+        if (group == null) {
+            return null;
+        }
+
+        for (final ControllerServiceNode serviceNode : 
group.getControllerServices(false)) {
+            if (serviceNode.getVersionedComponentId().isPresent() && 
serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) {
+                return serviceNode;
+            }
+        }
+
+        return getVersionedControllerService(group.getParent(), 
versionedComponentId);
+    }
+
     private Set<String> getKnownVariableNames() {
         final Set<String> variableNames = new HashSet<>();
         populateKnownVariableNames(this, variableNames);
@@ -3159,6 +3402,44 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             group.setVersionControlInformation(vci, Collections.emptyMap());
         }
 
+
+        // Controller Services
+        // Controller Services have to be handled a bit differently than other 
components. This is because Processors and Controller
+        // Services may reference other Controller Services. Since we may be 
adding Service A, which depends on Service B, before adding
+        // Service B, we need to ensure that we create all Controller Services 
first and then call updateControllerService for each
+        // Controller Service. This way, we ensure that all services have been 
created before setting the properties. This allows us to
+        // properly obtain the correct mapping of Controller Service 
VersionedComponentID to Controller Service instance id.
+        final Map<String, ControllerServiceNode> servicesByVersionedId = 
group.getControllerServices(false).stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
+
+        final Set<String> controllerServicesRemoved = new 
HashSet<>(servicesByVersionedId.keySet());
+
+        final Map<ControllerServiceNode, VersionedControllerService> services 
= new HashMap<>();
+
+        // Add any Controller Service that does not yet exist.
+        for (final VersionedControllerService proposedService : 
proposed.getControllerServices()) {
+            ControllerServiceNode service = 
servicesByVersionedId.get(proposedService.getIdentifier());
+            if (service == null) {
+                service = addControllerService(group, proposedService, 
componentIdSeed);
+                LOG.info("Added {} to {}", service, this);
+            }
+
+            services.put(service, proposedService);
+        }
+
+        // Update all of the Controller Services to match the 
VersionedControllerService
+        for (final Map.Entry<ControllerServiceNode, 
VersionedControllerService> entry : services.entrySet()) {
+            final ControllerServiceNode service = entry.getKey();
+            final VersionedControllerService proposedService = 
entry.getValue();
+
+            if 
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+                updateControllerService(service, proposedService);
+                LOG.info("Updated {}", service);
+            }
+
+            controllerServicesRemoved.remove(proposedService.getIdentifier());
+        }
+
         // Child groups
         final Map<String, ProcessGroup> childGroupsByVersionedId = 
group.getProcessGroups().stream()
             .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
@@ -3179,26 +3460,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
         }
 
-
-        // Controller Services
-        final Map<String, ControllerServiceNode> servicesByVersionedId = 
group.getControllerServices(false).stream()
-            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
-        final Set<String> controllerServicesRemoved = new 
HashSet<>(servicesByVersionedId.keySet());
-
-        for (final VersionedControllerService proposedService : 
proposed.getControllerServices()) {
-            final ControllerServiceNode service = 
servicesByVersionedId.get(proposedService.getIdentifier());
-            if (service == null) {
-                final ControllerServiceNode added = 
addControllerService(group, proposedService, componentIdSeed);
-                LOG.info("Added {} to {}", added, this);
-            } else if 
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
-                updateControllerService(service, proposedService);
-                LOG.info("Updated {}", service);
-            }
-
-            controllerServicesRemoved.remove(proposedService.getIdentifier());
-        }
-
-
         // Funnels
         final Map<String, Funnel> funnelsByVersionedId = 
group.getFunnels().stream()
             .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(component.getIdentifier()), 
Function.identity()));
@@ -3608,7 +3869,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         service.setAnnotationData(proposed.getAnnotationData());
         service.setComments(proposed.getComments());
         service.setName(proposed.getName());
-        service.setProperties(populatePropertiesMap(service.getProperties(), 
proposed.getProperties()));
+        service.setProperties(populatePropertiesMap(service.getProperties(), 
proposed.getProperties(), proposed.getPropertyDescriptors(), 
service.getProcessGroup()));
 
         if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
             final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
@@ -3728,7 +3989,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
         processor.setName(proposed.getName());
         processor.setPenalizationPeriod(proposed.getPenaltyDuration());
-        
processor.setProperties(populatePropertiesMap(processor.getProperties(), 
proposed.getProperties()));
+        
processor.setProperties(populatePropertiesMap(processor.getProperties(), 
proposed.getProperties(), proposed.getPropertyDescriptors(), 
processor.getProcessGroup()));
         processor.setRunDuration(proposed.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
         processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
         
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
@@ -3745,19 +4006,60 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
 
-    private Map<String, String> populatePropertiesMap(final 
Map<PropertyDescriptor, String> currentProperties, final Map<String, String> 
proposedProperties) {
+    private Map<String, String> populatePropertiesMap(final 
Map<PropertyDescriptor, String> currentProperties, final Map<String, String> 
proposedProperties,
+        final Map<String, VersionedPropertyDescriptor> proposedDescriptors, 
final ProcessGroup group) {
+
         final Map<String, String> fullPropertyMap = new HashMap<>();
         for (final PropertyDescriptor property : currentProperties.keySet()) {
             fullPropertyMap.put(property.getName(), null);
         }
 
         if (proposedProperties != null) {
-            fullPropertyMap.putAll(proposedProperties);
+            for (final Map.Entry<String, String> entry : 
proposedProperties.entrySet()) {
+                final String propertyName = entry.getKey();
+                final VersionedPropertyDescriptor descriptor = 
proposedDescriptors.get(propertyName);
+
+                String value;
+                if (descriptor != null && 
descriptor.getIdentifiesControllerService()) {
+                    // Property identifies a Controller Service. So the value 
that we want to assign is not the value given.
+                    // The value given is instead the Versioned Component ID 
of the Controller Service. We want to resolve this
+                    // to the instance ID of the Controller Service.
+                    final String serviceVersionedComponentId = 
entry.getValue();
+                    final String instanceId = 
getServiceInstanceId(serviceVersionedComponentId, group);
+                    value = instanceId == null ? serviceVersionedComponentId : 
instanceId;
+                } else {
+                    value = entry.getValue();
+                }
+
+                fullPropertyMap.put(propertyName, value);
+            }
         }
 
         return fullPropertyMap;
     }
 
+    private String getServiceInstanceId(final String 
serviceVersionedComponentId, final ProcessGroup group) {
+        for (final ControllerServiceNode serviceNode : 
group.getControllerServices(false)) {
+            final Optional<String> optionalVersionedId = 
serviceNode.getVersionedComponentId();
+            if (!optionalVersionedId.isPresent()) {
+                continue;
+            }
+
+            final String versionedId = optionalVersionedId.get();
+            if (versionedId.equals(serviceVersionedComponentId)) {
+                return serviceNode.getIdentifier();
+            }
+        }
+
+        final ProcessGroup parent = group.getParent();
+        if (parent == null) {
+            return null;
+        }
+
+        return getServiceInstanceId(serviceVersionedComponentId, parent);
+
+    }
+
     private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup 
destination, final VersionedRemoteProcessGroup proposed, final String 
componentIdSeed) {
         final RemoteProcessGroup rpg = 
flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), 
destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
         rpg.setVersionedComponentId(proposed.getIdentifier());
@@ -3773,12 +4075,12 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
         rpg.setInputPorts(proposed.getInputPorts() == null ? 
Collections.emptySet() : proposed.getInputPorts().stream()
             .map(port -> createPortDescriptor(port, componentIdSeed, 
rpg.getIdentifier()))
-            .collect(Collectors.toSet()));
+            .collect(Collectors.toSet()), false);
         rpg.setName(proposed.getName());
         rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
         rpg.setOutputPorts(proposed.getOutputPorts() == null ? 
Collections.emptySet() : proposed.getOutputPorts().stream()
             .map(port -> createPortDescriptor(port, componentIdSeed, 
rpg.getIdentifier()))
-            .collect(Collectors.toSet()));
+            .collect(Collectors.toSet()), false);
         rpg.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
         rpg.setProxyHost(proposed.getProxyHost());
         rpg.setProxyPort(proposed.getProxyPort());
@@ -3831,12 +4133,12 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
 
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-        final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
+        final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(this, controllerServiceProvider, 
flowController.getFlowRegistryClient(), false);
 
         final ComparableDataFlow currentFlow = new 
StandardComparableDataFlow("Local Flow", versionedGroup);
         final ComparableDataFlow snapshotFlow = new 
StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
 
-        final FlowComparator flowComparator = new 
StandardFlowComparator(snapshotFlow, currentFlow, new 
EvolvingDifferenceDescriptor());
+        final FlowComparator flowComparator = new 
StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), 
new EvolvingDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();
         final Set<FlowDifference> differences = comparison.getDifferences();
         final Set<FlowDifference> functionalDifferences = differences.stream()

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
new file mode 100644
index 0000000..f362c1e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import java.util.Set;
+
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+
+class StandardVersionedFlowStatus implements VersionedFlowStatus {
+    private final VersionedFlowState state;
+    private final String explanation;
+    private final Set<FlowDifference> currentDifferences;
+
+    StandardVersionedFlowStatus(final VersionedFlowState state, final String 
explanation, final Set<FlowDifference> differences) {
+        this.state = state;
+        this.explanation = explanation;
+        this.currentDifferences = differences;
+    }
+
+    @Override
+    public VersionedFlowState getState() {
+        return state;
+    }
+
+    @Override
+    public String getStateExplanation() {
+        return explanation;
+    }
+
+    Set<FlowDifference> getCurrentDifferences() {
+        return currentDifferences;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index 92a4166..106d19a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -34,6 +34,7 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
     private volatile VersionedProcessGroup flowSnapshot;
     private volatile boolean modified;
     private volatile boolean current;
+    private final VersionedFlowStatus status;
 
     public static class Builder {
         private String registryIdentifier;
@@ -47,6 +48,7 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
         private VersionedProcessGroup flowSnapshot;
         private Boolean modified = null;
         private Boolean current = null;
+        private VersionedFlowStatus status;
 
         public Builder registryId(String registryId) {
             this.registryIdentifier = registryId;
@@ -103,6 +105,11 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
             return this;
         }
 
+        public Builder status(final VersionedFlowStatus status) {
+            this.status = status;
+            return this;
+        }
+
         public static Builder fromDto(VersionControlInformationDTO dto) {
             Builder builder = new Builder();
             builder.registryId(dto.getRegistryId())
@@ -126,7 +133,7 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
             Objects.requireNonNull(version, "Version must be specified");
 
             final StandardVersionControlInformation svci = new 
StandardVersionControlInformation(registryIdentifier, registryName,
-                bucketIdentifier, flowIdentifier, version, flowSnapshot, 
modified, current);
+                bucketIdentifier, flowIdentifier, version, flowSnapshot, 
modified, current, status);
 
             svci.setBucketName(bucketName);
             svci.setFlowName(flowName);
@@ -138,7 +145,7 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
 
 
     public StandardVersionControlInformation(final String registryId, final 
String registryName, final String bucketId, final String flowId, final int 
version,
-        final VersionedProcessGroup snapshot, final boolean modified, final 
boolean current) {
+        final VersionedProcessGroup snapshot, final boolean modified, final 
boolean current, final VersionedFlowStatus status) {
         this.registryIdentifier = registryId;
         this.registryName = registryName;
         this.bucketIdentifier = bucketId;
@@ -147,6 +154,7 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
         this.flowSnapshot = snapshot;
         this.modified = modified;
         this.current = current;
+        this.status = status;
     }
 
 
@@ -232,4 +240,9 @@ public class StandardVersionControlInformation implements 
VersionControlInformat
     public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
         this.flowSnapshot = flowSnapshot;
     }
+
+    @Override
+    public VersionedFlowStatus getStatus() {
+        return status;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
deleted file mode 100644
index 193bde8..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.registry.flow.mapping;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.registry.flow.BatchSize;
-import org.apache.nifi.registry.flow.Bundle;
-import org.apache.nifi.registry.flow.ComponentType;
-import org.apache.nifi.registry.flow.ConnectableComponent;
-import org.apache.nifi.registry.flow.ConnectableComponentType;
-import org.apache.nifi.registry.flow.ControllerServiceAPI;
-import org.apache.nifi.registry.flow.PortType;
-import org.apache.nifi.registry.flow.Position;
-import org.apache.nifi.registry.flow.VersionedConnection;
-import org.apache.nifi.registry.flow.VersionedControllerService;
-import org.apache.nifi.registry.flow.VersionedFunnel;
-import org.apache.nifi.registry.flow.VersionedLabel;
-import org.apache.nifi.registry.flow.VersionedPort;
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
-import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
-import org.apache.nifi.web.api.dto.BatchSettingsDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceApiDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-
-
-public class NiFiRegistryDtoMapper {
-    // We need to keep a mapping of component id to versionedComponentId as we 
transform these objects. This way, when
-    // we call #mapConnectable, instead of generating a new UUID for the 
ConnectableComponent, we can lookup the 'versioned'
-    // identifier based on the comopnent's actual id. We do connections last, 
so that all components will already have been
-    // created before attempting to create the connection, where the 
ConnectableDTO is converted.
-    private Map<String, String> versionedComponentIds = new HashMap<>();
-
-    public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) {
-        versionedComponentIds.clear();
-        return mapGroup(dto);
-    }
-
-    private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) {
-        final VersionedProcessGroup versionedGroup = new 
VersionedProcessGroup();
-        versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), 
dto.getId()));
-        versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        versionedGroup.setName(dto.getName());
-        versionedGroup.setComments(dto.getComments());
-        versionedGroup.setPosition(mapPosition(dto.getPosition()));
-
-        final FlowSnippetDTO contents = dto.getContents();
-
-        
versionedGroup.setControllerServices(contents.getControllerServices().stream()
-            .map(this::mapControllerService)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setFunnels(contents.getFunnels().stream()
-            .map(this::mapFunnel)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setInputPorts(contents.getInputPorts().stream()
-            .map(this::mapPort)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setOutputPorts(contents.getOutputPorts().stream()
-            .map(this::mapPort)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setLabels(contents.getLabels().stream()
-            .map(this::mapLabel)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setProcessors(contents.getProcessors().stream()
-            .map(this::mapProcessor)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        
versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream()
-            .map(this::mapRemoteProcessGroup)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setProcessGroups(contents.getProcessGroups().stream()
-            .map(this::mapGroup)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        versionedGroup.setConnections(contents.getConnections().stream()
-            .map(this::mapConnection)
-            .collect(Collectors.toCollection(LinkedHashSet::new)));
-
-        return versionedGroup;
-    }
-
-    private String getId(final String currentVersionedId, final String 
componentId) {
-        final String versionedId;
-        if (currentVersionedId == null) {
-            versionedId = 
UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
-        } else {
-            versionedId = currentVersionedId;
-        }
-
-        versionedComponentIds.put(componentId, versionedId);
-        return versionedId;
-    }
-
-    private String getGroupId(final String groupId) {
-        return versionedComponentIds.get(groupId);
-    }
-
-    public VersionedConnection mapConnection(final ConnectionDTO dto) {
-        final VersionedConnection connection = new VersionedConnection();
-        connection.setIdentifier(getId(dto.getVersionedComponentId(), 
dto.getId()));
-        connection.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        connection.setName(dto.getName());
-        
connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold());
-        
connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold());
-        connection.setFlowFileExpiration(dto.getFlowFileExpiration());
-        connection.setLabelIndex(dto.getLabelIndex());
-        connection.setPosition(mapPosition(dto.getPosition()));
-        connection.setPrioritizers(dto.getPrioritizers());
-        connection.setSelectedRelationships(dto.getSelectedRelationships());
-        connection.setzIndex(dto.getzIndex());
-
-        connection.setBends(dto.getBends().stream()
-            .map(this::mapPosition)
-            .collect(Collectors.toList()));
-
-        connection.setSource(mapConnectable(dto.getSource()));
-        connection.setDestination(mapConnectable(dto.getDestination()));
-
-        return connection;
-    }
-
-    public ConnectableComponent mapConnectable(final ConnectableDTO dto) {
-        final ConnectableComponent component = new ConnectableComponent();
-
-        final String versionedId = dto.getVersionedComponentId();
-        if (versionedId == null) {
-            final String resolved = versionedComponentIds.get(dto.getId());
-            if (resolved == null) {
-                throw new IllegalArgumentException("Unable to map Connectable 
Component with identifier " + dto.getId() + " to any version-controlled 
component");
-            }
-
-            component.setId(resolved);
-        } else {
-            component.setId(versionedId);
-        }
-
-        component.setComments(dto.getComments());
-        component.setGroupId(dto.getGroupId());
-        component.setName(dto.getName());
-        component.setType(ConnectableComponentType.valueOf(dto.getType()));
-        return component;
-    }
-
-    public VersionedControllerService mapControllerService(final 
ControllerServiceDTO dto) {
-        final VersionedControllerService service = new 
VersionedControllerService();
-        service.setIdentifier(getId(dto.getVersionedComponentId(), 
dto.getId()));
-        service.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        service.setName(dto.getName());
-        service.setAnnotationData(dto.getAnnotationData());
-        service.setBundle(mapBundle(dto.getBundle()));
-        service.setComments(dto.getComments());
-        
service.setControllerServiceApis(dto.getControllerServiceApis().stream()
-            .map(this::mapControllerServiceApi)
-            .collect(Collectors.toList()));
-        service.setProperties(dto.getProperties());
-        service.setType(dto.getType());
-        return null;
-    }
-
-    private Bundle mapBundle(final BundleDTO dto) {
-        final Bundle bundle = new Bundle();
-        bundle.setGroup(dto.getGroup());
-        bundle.setArtifact(dto.getArtifact());
-        bundle.setVersion(dto.getVersion());
-        return bundle;
-    }
-
-    private ControllerServiceAPI mapControllerServiceApi(final 
ControllerServiceApiDTO dto) {
-        final ControllerServiceAPI api = new ControllerServiceAPI();
-        api.setBundle(mapBundle(dto.getBundle()));
-        api.setType(dto.getType());
-        return api;
-    }
-
-    public VersionedFunnel mapFunnel(final FunnelDTO dto) {
-        final VersionedFunnel funnel = new VersionedFunnel();
-        funnel.setIdentifier(getId(dto.getVersionedComponentId(), 
dto.getId()));
-        funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        funnel.setPosition(mapPosition(dto.getPosition()));
-        return funnel;
-    }
-
-    public VersionedLabel mapLabel(final LabelDTO dto) {
-        final VersionedLabel label = new VersionedLabel();
-        label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
-        label.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        label.setHeight(dto.getHeight());
-        label.setWidth(dto.getWidth());
-        label.setLabel(dto.getLabel());
-        label.setPosition(mapPosition(dto.getPosition()));
-        label.setStyle(dto.getStyle());
-        return label;
-    }
-
-    public VersionedPort mapPort(final PortDTO dto) {
-        final VersionedPort port = new VersionedPort();
-        port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
-        port.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        port.setComments(dto.getComments());
-        
port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
-        port.setName(dto.getName());
-        port.setPosition(mapPosition(dto.getPosition()));
-        port.setType(PortType.valueOf(dto.getType()));
-        return port;
-    }
-
-    public Position mapPosition(final PositionDTO dto) {
-        final Position position = new Position();
-        position.setX(dto.getX());
-        position.setY(dto.getY());
-        return position;
-    }
-
-    public VersionedProcessor mapProcessor(final ProcessorDTO dto) {
-        final ProcessorConfigDTO config = dto.getConfig();
-
-        final VersionedProcessor processor = new VersionedProcessor();
-        processor.setIdentifier(getId(dto.getVersionedComponentId(), 
dto.getId()));
-        processor.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        processor.setType(dto.getType());
-        processor.setAnnotationData(config.getAnnotationData());
-        
processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships());
-        processor.setBulletinLevel(config.getBulletinLevel());
-        processor.setBundle(mapBundle(dto.getBundle()));
-        processor.setComments(config.getComments());
-        
processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount());
-        processor.setExecutionNode(config.getExecutionNode());
-        processor.setName(dto.getName());
-        processor.setPenaltyDuration(config.getPenaltyDuration());
-        processor.setPosition(mapPosition(dto.getPosition()));
-        processor.setProperties(config.getProperties());
-        processor.setRunDurationMillis(config.getRunDurationMillis());
-        processor.setSchedulingPeriod(config.getSchedulingPeriod());
-        processor.setSchedulingStrategy(config.getSchedulingStrategy());
-        processor.setStyle(dto.getStyle());
-        processor.setYieldDuration(config.getYieldDuration());
-        return processor;
-    }
-
-    public VersionedRemoteProcessGroup mapRemoteProcessGroup(final 
RemoteProcessGroupDTO dto) {
-        final VersionedRemoteProcessGroup rpg = new 
VersionedRemoteProcessGroup();
-        rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
-        rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
-        rpg.setComments(dto.getComments());
-        rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout());
-        rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface());
-        rpg.setName(dto.getName());
-        rpg.setInputPorts(dto.getContents().getInputPorts().stream()
-            .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT))
-            .collect(Collectors.toSet()));
-        rpg.setOutputPorts(dto.getContents().getOutputPorts().stream()
-            .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT))
-            .collect(Collectors.toSet()));
-        rpg.setPosition(mapPosition(dto.getPosition()));
-        rpg.setProxyHost(dto.getProxyHost());
-        rpg.setProxyPort(dto.getProxyPort());
-        rpg.setProxyUser(dto.getProxyUser());
-        rpg.setTargetUri(dto.getTargetUri());
-        rpg.setTargetUris(dto.getTargetUris());
-        rpg.setTransportProtocol(dto.getTransportProtocol());
-        rpg.setYieldDuration(dto.getYieldDuration());
-        return rpg;
-    }
-
-    public VersionedRemoteGroupPort mapRemotePort(final 
RemoteProcessGroupPortDTO dto, final ComponentType componentType) {
-        final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort();
-        port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
-        port.setGroupIdentifier(getGroupId(dto.getGroupId()));
-        port.setComments(dto.getComments());
-        
port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
-        port.setRemoteGroupId(dto.getGroupId());
-        port.setName(dto.getName());
-        port.setUseCompression(dto.getUseCompression());
-        port.setBatchSize(mapBatchSettings(dto.getBatchSettings()));
-        port.setTargetId(dto.getTargetId());
-        port.setComponentType(componentType);
-        return port;
-    }
-
-    private BatchSize mapBatchSettings(final BatchSettingsDTO dto) {
-        final BatchSize batchSize = new BatchSize();
-        batchSize.setCount(dto.getCount());
-        batchSize.setDuration(dto.getDuration());
-        batchSize.setSize(dto.getSize());
-        return batchSize;
-    }
-}

Reply via email to