markap14 commented on a change in pull request #5514:
URL: https://github.com/apache/nifi/pull/5514#discussion_r752420922



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
##########
@@ -0,0 +1,2050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.EncryptionException;
+import org.apache.nifi.flow.BatchSize;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.PublicPort;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronizer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+    private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
+    public static final String ENC_PREFIX = "enc{";
+    public static final String ENC_SUFFIX = "}";
+
+    private final ProcessGroupSynchronizationContext context;
+    private final Set<String> updatedVersionedComponentIds = new HashSet<>();
+
+    private Set<String> preExistingVariables = new HashSet<>();
+    private GroupSynchronizationOptions syncOptions;
+
+    public StandardProcessGroupSynchronizer(final 
ProcessGroupSynchronizationContext context) {
+        this.context = context;
+    }
+
+    private void setPreExistingVariables(final Set<String> 
preExistingVariables) {
+        this.preExistingVariables = preExistingVariables;
+    }
+
+    private void setUpdatedVersionedComponentIds(final Set<String> 
updatedVersionedComponentIds) {
+        this.updatedVersionedComponentIds.clear();
+        this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
+    }
+
+    public void setSynchronizationOptions(final GroupSynchronizationOptions 
syncOptions) {
+        this.syncOptions = syncOptions;
+    }
+
+    @Override
+    public void synchronize(final ProcessGroup group, final 
VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions 
options) throws ProcessorInstantiationException {
+
+        final NiFiRegistryFlowMapper mapper = new 
NiFiRegistryFlowMapper(context.getExtensionManager(), 
context.getFlowMappingOptions());
+        final VersionedProcessGroup versionedGroup = 
mapper.mapProcessGroup(group, context.getControllerServiceProvider(), 
context.getFlowRegistryClient(), true);
+
+        final ComparableDataFlow localFlow = new 
StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
+        final ComparableDataFlow proposedFlow = new 
StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
+
+        final PropertyDecryptor decryptor = options.getPropertyDecryptor();
+        final FlowComparator flowComparator = new 
StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), 
new StaticDifferenceDescriptor(), decryptor::decrypt);
+        final FlowComparison flowComparison = flowComparator.compare();
+
+        updatedVersionedComponentIds.clear();
+        setSynchronizationOptions(options);
+
+        for (final FlowDifference diff : flowComparison.getDifferences()) {
+            if 
(FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, 
context.getFlowManager())) {
+                continue;
+            }
+            if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
+                continue;
+            }
+
+            // If this update adds a new Controller Service, then we need to 
check if the service already exists at a higher level
+            // and if so compare our VersionedControllerService to the 
existing service.
+            if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+                final VersionedComponent component = diff.getComponentA() == 
null ? diff.getComponentB() : diff.getComponentA();
+                if (ComponentType.CONTROLLER_SERVICE == 
component.getComponentType()) {
+                    final ControllerServiceNode serviceNode = 
getVersionedControllerService(group, component.getIdentifier());
+                    if (serviceNode != null) {
+                        final VersionedControllerService versionedService = 
mapper.mapControllerService(serviceNode, context.getControllerServiceProvider(),
+                            
Collections.singleton(serviceNode.getProcessGroupIdentifier()), new 
HashMap<>());
+                        final Set<FlowDifference> differences = 
flowComparator.compareControllerServices(versionedService, 
(VersionedControllerService) component);
+
+                        if (!differences.isEmpty()) {
+                            
updatedVersionedComponentIds.add(component.getIdentifier());
+                        }
+
+                        continue;
+                    }
+                }
+            }
+
+            final VersionedComponent component = diff.getComponentA() == null 
? diff.getComponentB() : diff.getComponentA();
+            updatedVersionedComponentIds.add(component.getIdentifier());
+
+            if (component.getComponentType() == 
ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == 
ComponentType.REMOTE_OUTPUT_PORT) {
+                final String remoteGroupId = ((VersionedRemoteGroupPort) 
component).getRemoteGroupId();
+                updatedVersionedComponentIds.add(remoteGroupId);
+            }
+        }
+
+        if (LOG.isInfoEnabled()) {
+            final String differencesByLine = 
flowComparison.getDifferences().stream()
+                .map(FlowDifference::toString)
+                .collect(Collectors.joining("\n"));
+
+            LOG.info("Updating {} to {}; there are {} differences to take into 
account:\n{}", group, proposedSnapshot,
+                flowComparison.getDifferences().size(), differencesByLine);
+        }
+
+        final Set<String> knownVariables = getKnownVariableNames(group);
+
+        preExistingVariables.clear();
+
+        // If we don't want to update existing variables, we need to populate 
the pre-existing variables so that we know which variables already existed.
+        // We can't do this when updating the Variable Registry for a Process 
Group because variables are inherited, and the variables of the parent group
+        // may already have been updated when we get to the point of updating 
a child's Variable Registry. As a result, we build up a Set of all known
+        // Variables before we update the Variable Registries.
+        if (!options.isUpdateExistingVariables()) {
+            preExistingVariables.addAll(knownVariables);
+        }
+
+        synchronize(group, proposedSnapshot.getFlowContents(), 
proposedSnapshot.getParameterContexts());
+        group.onComponentModified();
+    }
+
+    private void synchronize(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> 
versionedParameterContexts)
+                    throws ProcessorInstantiationException {
+
+        // Some components, such as Processors, may have a Scheduled State of 
RUNNING in the proposed flow. However, if we
+        // transition the service into the RUNNING state, and then we need to 
update a Connection that is connected to it,
+        // updating the Connection will fail because the Connection's source & 
destination must both be stopped in order to
+        // update it. To avoid that, we simply pause the scheduler. Once all 
updates have been made, we will resume the scheduler.
+        context.getComponentScheduler().pause();
+
+        group.setComments(proposed.getComments());
+
+        if (syncOptions.isUpdateSettings()) {
+            if (proposed.getName() != null) {
+                group.setName(proposed.getName());
+            }
+
+            if (proposed.getPosition() != null) {
+                group.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+            }
+        }
+
+        updateParameterContext(group, proposed, versionedParameterContexts, 
context.getComponentIdGenerator());
+        updateVariableRegistry(group, proposed);
+
+        final FlowFileConcurrency flowFileConcurrency = 
proposed.getFlowFileConcurrency() == null ? FlowFileConcurrency.UNBOUNDED :
+            FlowFileConcurrency.valueOf(proposed.getFlowFileConcurrency());
+        group.setFlowFileConcurrency(flowFileConcurrency);
+
+        final FlowFileOutboundPolicy outboundPolicy = 
proposed.getFlowFileOutboundPolicy() == null ? 
FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE :
+            
FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy());
+        group.setFlowFileOutboundPolicy(outboundPolicy);
+
+        
group.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
+        
group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
+        
group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
+
+        final VersionedFlowCoordinates remoteCoordinates = 
proposed.getVersionedFlowCoordinates();
+        if (remoteCoordinates == null) {
+            group.disconnectVersionControl(false);
+        } else {
+            final String registryId = 
context.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
+            final String bucketId = remoteCoordinates.getBucketId();
+            final String flowId = remoteCoordinates.getFlowId();
+            final int version = remoteCoordinates.getVersion();
+
+            final FlowRegistry flowRegistry = 
context.getFlowRegistryClient().getFlowRegistry(registryId);
+            final String registryName = flowRegistry == null ? registryId : 
flowRegistry.getName();
+
+            final VersionedFlowState flowState;
+            if (remoteCoordinates.getLatest() == null) {
+                flowState = VersionedFlowState.SYNC_FAILURE;
+            } else {
+                flowState = remoteCoordinates.getLatest() ? 
VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+            }
+
+            final VersionControlInformation vci = new 
StandardVersionControlInformation.Builder()
+                .registryId(registryId)
+                .registryName(registryName)
+                .bucketId(bucketId)
+                .bucketName(bucketId)
+                .flowId(flowId)
+                .flowName(flowId)
+                .version(version)
+                
.flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : 
null)
+                .status(new StandardVersionedFlowStatus(flowState, 
flowState.getDescription()))
+                .build();
+
+            group.setVersionControlInformation(vci, Collections.emptyMap());
+        }
+
+        // In order to properly update all of the components, we have to 
follow a specific order of operations, in order to ensure that
+        // we don't try to perform illegal operations like removing a 
Processor that has an incoming connection (which would throw an
+        // IllegalStateException and fail).
+        //
+        // The sequence of steps / order of operations are as follows:
+        //
+        // 1. Remove any Controller Services that do not exist in the proposed 
group
+        // 2. Add any Controller Services that are in the proposed group that 
are not in the current flow
+        // 3. Update Controller Services to match those in the proposed group
+        // 4. Remove any connections that do not exist in the proposed group
+        // 5. For any connection that does exist, if the proposed group has a 
different destination for the connection, update the destination.
+        //    If the new destination does not yet exist in the flow, set the 
destination as some temporary component.
+        // 6. Remove any other components that do not exist in the proposed 
group.
+        // 7. Add any components, other than Connections, that exist in the 
proposed group but not in the current flow
+        // 8. Update components, other than Connections, to match those in the 
proposed group
+        // 9. Add connections that exist in the proposed group that are not in 
the current flow
+        // 10. Update connections to match those in the proposed group
+        // 11. Delete the temporary destination that was created above
+
+        // Keep track of any processors that have been updated to have 
auto-terminated relationships so that we can set those
+        // auto-terminated relationships after we've handled creating/deleting 
necessary connections.
+        final Map<ProcessorNode, Set<Relationship>> 
autoTerminatedRelationships = new HashMap<>();
+
+        // During the flow update, we will use temporary names for process 
group ports. This is because port names must be
+        // unique within a process group, but during an update we might 
temporarily be in a state where two ports have the same name.
+        // For example, if a process group update involves removing/renaming 
port A, and then adding/updating port B where B is given
+        // A's former name. This is a valid state by the end of the flow 
update, but for a brief moment there may be two ports with the
+        // same name. To avoid this conflict, we keep the final names in a map 
indexed by port id, use a temporary name for each port
+        // during the update, and after all ports have been 
added/updated/removed, we set the final names on all ports.
+        final Map<Port, String> proposedPortFinalNames = new HashMap<>();
+
+        // Controller Services
+        final Map<String, ControllerServiceNode> 
controllerServicesByVersionedId = componentsById(group, grp -> 
grp.getControllerServices(false),
+            ControllerServiceNode::getIdentifier, 
ControllerServiceNode::getVersionedComponentId);
+        removeMissingControllerServices(group, proposed, 
controllerServicesByVersionedId);
+        synchronizeControllerServices(group, proposed, 
controllerServicesByVersionedId);
+
+        // Remove any connections that are not in the Proposed Process Group
+        // Connections must be the first thing to remove, not the last. 
Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        final Map<String, Connection> connectionsByVersionedId = 
componentsById(group, ProcessGroup::getConnections, Connection::getIdentifier, 
Connection::getVersionedComponentId);
+        removeMissingConnections(group, proposed, connectionsByVersionedId);
+
+        // Before we remove other components, we have to ensure that the 
Connections have the appropriate destinations. Otherwise, we could have a 
situation
+        // where Connection A used to have a destination of B but now has a 
destination of C, which doesn't exist yet. And B doesn't exist in the new flow.
+        // This is a problem because we cannot remove B, since it has an 
incoming Connection. And we can't change the destination to C because C hasn't 
been
+        // added yet. As a result, we need a temporary location to set as the 
Connection's destination. So we create a Funnel for this and then we can update
+        // all Connections to have the appropriate destinations.
+        final Set<String> connectionsWithTempDestination = 
updateConnectionDestinations(group, proposed, connectionsByVersionedId);
+
+        try {
+            final Map<String, Funnel> funnelsByVersionedId = 
componentsById(group, ProcessGroup::getFunnels);
+            final Map<String, ProcessorNode> processorsByVersionedId = 
componentsById(group, ProcessGroup::getProcessors);
+            final Map<String, Port> inputPortsByVersionedId = 
componentsById(group, ProcessGroup::getInputPorts);
+            final Map<String, Port> outputPortsByVersionedId = 
componentsById(group, ProcessGroup::getOutputPorts);
+            final Map<String, Label> labelsByVersionedId = 
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, 
Label::getVersionedComponentId);
+            final Map<String, RemoteProcessGroup> rpgsByVersionedId = 
componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                RemoteProcessGroup::getIdentifier, 
RemoteProcessGroup::getVersionedComponentId);
+            final Map<String, ProcessGroup> childGroupsByVersionedId = 
componentsById(group, ProcessGroup::getProcessGroups, 
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+            removeMissingProcessors(group, proposed, processorsByVersionedId);
+            removeMissingFunnels(group, proposed, funnelsByVersionedId);
+            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
+            removeMissingOutputPorts(group, proposed, 
outputPortsByVersionedId);
+            removeMissingLabels(group, proposed, labelsByVersionedId);
+            removeMissingRpg(group, proposed, rpgsByVersionedId);
+            removeMissingChildGroups(group, proposed, 
childGroupsByVersionedId);
+
+            // Synchronize Child Process Groups
+            synchronizeChildGroups(group, proposed, 
versionedParameterContexts, childGroupsByVersionedId);
+
+            synchronizeFunnels(group, proposed, funnelsByVersionedId);
+            synchronizeInputPorts(group, proposed, proposedPortFinalNames, 
inputPortsByVersionedId);
+            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, 
outputPortsByVersionedId);
+            synchronizeLabels(group, proposed, labelsByVersionedId);
+            synchronizeProcessors(group, proposed, 
autoTerminatedRelationships, processorsByVersionedId);
+            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+        } finally {
+            // Make sure that we reset the connections
+            restoreConnectionDestinations(group, proposed, 
connectionsByVersionedId, connectionsWithTempDestination);
+            removeTemporaryFunnel(group);
+        }
+
+        // We can now add in any necessary connections, since all connectable 
components have now been created.
+        synchronizeConnections(group, proposed, connectionsByVersionedId);
+
+        // Once the appropriate connections have been removed, we may now 
update Processors' auto-terminated relationships.
+        // We cannot do this above, in the 'updateProcessor' call because if a 
connection is removed and changed to auto-terminated,
+        // then updating this in the updateProcessor call above would attempt 
to set the Relationship to being auto-terminated while a
+        // Connection for that relationship exists. This will throw an 
Exception.
+        
autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);
+
+        // All ports have now been added/removed as necessary. We can now 
resolve the port names.
+        updatePortsToFinalNames(proposedPortFinalNames);
+
+        // Start all components that are queued up to be started now
+        context.getComponentScheduler().resume();
+    }
+
+    private void synchronizeChildGroups(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> 
versionedParameterContexts,
+                                        final Map<String, ProcessGroup> 
childGroupsByVersionedId) throws ProcessorInstantiationException {
+
+        for (final VersionedProcessGroup proposedChildGroup : 
proposed.getProcessGroups()) {
+            final ProcessGroup childGroup = 
childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
+            final VersionedFlowCoordinates childCoordinates = 
proposedChildGroup.getVersionedFlowCoordinates();
+
+            // if there is a nested process group that is versioned 
controlled, make sure get the param contexts that go with that snapshot
+            // instead of the ones from the parent which would have been 
passed in to this method
+            Map<String, VersionedParameterContext> childParameterContexts = 
versionedParameterContexts;
+            if (childCoordinates != null && 
syncOptions.isUpdateDescendantVersionedFlows()) {
+                final String childParameterContextName = 
proposedChildGroup.getParameterContextName();
+                if (childParameterContextName != null && 
!versionedParameterContexts.containsKey(childParameterContextName)) {
+                    childParameterContexts = 
getVersionedParameterContexts(childCoordinates);
+                } else {
+                    childParameterContexts = versionedParameterContexts;
+                }
+            }
+
+            if (childGroup == null) {
+                final ProcessGroup added = addProcessGroup(group, 
proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables, 
childParameterContexts);
+                context.getFlowManager().onProcessGroupAdded(added);
+                
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+                LOG.info("Added {} to {}", added, group);
+            } else if (childCoordinates == null || 
syncOptions.isUpdateDescendantVersionedFlows()) {
+
+                final StandardProcessGroupSynchronizer sync = new 
StandardProcessGroupSynchronizer(context);
+                sync.setPreExistingVariables(preExistingVariables);
+                
sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
+                final GroupSynchronizationOptions options = 
GroupSynchronizationOptions.Builder.from(syncOptions)
+                    .updateGroupSettings(true)
+                    .build();
+
+                sync.setSynchronizationOptions(options);
+                sync.synchronize(childGroup, proposedChildGroup, 
childParameterContexts);
+
+                LOG.info("Updated {}", childGroup);
+            }
+        }
+    }
+
+    private void synchronizeControllerServices(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> 
servicesByVersionedId) {
+        // Controller Services have to be handled a bit differently than other 
components. This is because Processors and Controller
+        // Services may reference other Controller Services. Since we may be 
adding Service A, which depends on Service B, before adding
+        // Service B, we need to ensure that we create all Controller Services 
first and then call updateControllerService for each
+        // Controller Service. This way, we ensure that all services have been 
created before setting the properties. This allows us to
+        // properly obtain the correct mapping of Controller Service 
VersionedComponentID to Controller Service instance id.
+        final Map<ControllerServiceNode, VersionedControllerService> services 
= new HashMap<>();
+
+        // Add any Controller Service that does not yet exist.
+        final Map<String, ControllerServiceNode> servicesAdded = new 
HashMap<>();
+        for (final VersionedControllerService proposedService : 
proposed.getControllerServices()) {
+            ControllerServiceNode service = 
servicesByVersionedId.get(proposedService.getIdentifier());
+            if (service == null) {
+                service = addControllerService(group, 
proposedService.getIdentifier(), proposedService.getInstanceIdentifier(),
+                    proposedService.getType(), proposedService.getBundle(), 
context.getComponentIdGenerator());
+
+                LOG.info("Added {} to {}", service, group);
+                servicesAdded.put(proposedService.getIdentifier(), service);
+            }
+
+            services.put(service, proposedService);
+        }
+
+        // Because we don't know what order to instantiate the Controller 
Services, it's possible that we have two services such that Service A 
references Service B.
+        // If Service A happens to get created before Service B, the 
identifiers won't get matched up. As a result, we now iterate over all created 
Controller Services
+        // and update them again now that all Controller Services have been 
created at this level, so that the linkage can now be properly established.
+        for (final VersionedControllerService proposedService : 
proposed.getControllerServices()) {
+            final ControllerServiceNode addedService = 
servicesAdded.get(proposedService.getIdentifier());
+            if (addedService == null) {
+                continue;
+            }
+
+            updateControllerService(addedService, proposedService);
+        }
+
+        // Update all of the Controller Services to match the 
VersionedControllerService
+        for (final Map.Entry<ControllerServiceNode, 
VersionedControllerService> entry : services.entrySet()) {
+            final ControllerServiceNode service = entry.getKey();
+            final VersionedControllerService proposedService = 
entry.getValue();
+
+            if 
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+                updateControllerService(service, proposedService);
+                LOG.info("Updated {}", service);
+            }
+        }
+
+        // Determine all Controller Services whose scheduled state indicate 
they should be enabled.
+        final Set<ControllerServiceNode> toEnable = new HashSet<>();
+        for (final Map.Entry<ControllerServiceNode, 
VersionedControllerService> entry : services.entrySet()) {
+            if (entry.getValue().getScheduledState() == 
org.apache.nifi.flow.ScheduledState.ENABLED) {
+                toEnable.add(entry.getKey());
+            }
+        }
+
+        // Perform Validation so we can enable controller services and then 
enable them
+        toEnable.forEach(ComponentNode::performValidation);
+
+        // Enable the services. We have to do this at the end, after creating 
all of them, in case one service depends on another and
+        // therefore is not valid until all have been created.
+        toEnable.forEach(service -> {
+            if (service.getState() == ControllerServiceState.DISABLED) {
+                LOG.debug("Enabling {}", service);
+                
context.getControllerServiceProvider().enableControllerServicesAsync(Collections.singleton(service));
+            }
+        });
+    }
+
+    private void removeMissingConnections(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Connection> 
connectionsByVersionedId) {
+        final Set<String> connectionsRemoved = new 
HashSet<>(connectionsByVersionedId.keySet());
+
+        for (final VersionedConnection proposedConnection : 
proposed.getConnections()) {
+            connectionsRemoved.remove(proposedConnection.getIdentifier());
+        }
+
+        for (final String removedVersionedId : connectionsRemoved) {
+            final Connection connection = 
connectionsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", connection, group);
+            group.removeConnection(connection);
+            context.getFlowManager().onConnectionRemoved(connection);
+        }
+    }
+
+    private void synchronizeConnections(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Connection> 
connectionsByVersionedId) {
+        // Add and update Connections
+        for (final VersionedConnection proposedConnection : 
proposed.getConnections()) {
+            final Connection connection = 
connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                final Connection added = addConnection(group, 
proposedConnection, context.getComponentIdGenerator());
+                context.getFlowManager().onConnectionAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if (isUpdateable(connection)) {
+                // If the connection needs to be updated, then the source and 
destination will already have
+                // been stopped (else, the validation above would fail). So if 
the source or the destination is running,
+                // then we know that we don't need to update the connection.
+                updateConnection(connection, proposedConnection);
+                LOG.info("Updated {}", connection);
+            }
+        }
+    }
+
+    private Set<String> updateConnectionDestinations(final ProcessGroup group, 
final VersionedProcessGroup proposed, final Map<String, Connection> 
connectionsByVersionedId) {
+
+        final Set<String> connectionsWithTempDestination = new HashSet<>();
+        for (final VersionedConnection proposedConnection : 
proposed.getConnections()) {
+            final Connection connection = 
connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                continue;
+            }
+
+            // If the Connection's destination didn't change, nothing to do
+            final String destinationVersionId = 
connection.getDestination().getVersionedComponentId().orElse(null);
+            final String proposedDestinationId = 
proposedConnection.getDestination().getId();
+            if (Objects.equals(destinationVersionId, proposedDestinationId)) {
+                continue;
+            }
+
+            // Find the destination of the connection. If the destination 
doesn't yet exist (because it's part of the proposed Process Group but not yet 
added),
+            // we will set the destination to a temporary destination. Then, 
after adding components, we will update the destinations again.
+            Connectable newDestination = getConnectable(group, 
proposedConnection.getDestination());
+            if (newDestination == null) {
+                final Funnel temporaryDestination = 
getTemporaryFunnel(connection.getProcessGroup());
+                LOG.debug("Updated Connection {} to have a temporary 
destination of {}", connection, temporaryDestination);
+                newDestination = temporaryDestination;
+                
connectionsWithTempDestination.add(proposedConnection.getIdentifier());
+            }
+
+            connection.setDestination(newDestination);
+        }
+
+        return connectionsWithTempDestination;
+    }
+
+    private Funnel getTemporaryFunnel(final ProcessGroup group) {
+        final String tempFunnelId = group.getIdentifier() + 
TEMP_FUNNEL_ID_SUFFIX;
+        Funnel temporaryFunnel = 
context.getFlowManager().getFunnel(tempFunnelId);
+        if (temporaryFunnel == null) {
+            temporaryFunnel = 
context.getFlowManager().createFunnel(tempFunnelId);
+            temporaryFunnel.setPosition(new Position(0, 0));
+            group.addFunnel(temporaryFunnel, false);
+        }
+
+        return temporaryFunnel;
+    }
+
+    private void restoreConnectionDestinations(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Connection> 
connectionsByVersionedId,
+                                               final Set<String> 
connectionsWithTempDestination) {
+        if (connectionsWithTempDestination.isEmpty()) {
+            LOG.debug("No connections with temporary destinations for {}", 
group);
+            return;
+        }
+
+        final Map<String, VersionedConnection> versionedConnectionsById = 
proposed.getConnections().stream()
+            .collect(Collectors.toMap(VersionedConnection::getIdentifier, 
Function.identity()));
+
+        for (final String connectionId : connectionsWithTempDestination) {
+            final Connection connection = 
connectionsByVersionedId.get(connectionId);
+            final VersionedConnection versionedConnection = 
versionedConnectionsById.get(connectionId);
+
+            final Connectable newDestination = getConnectable(group, 
versionedConnection.getDestination());
+            if (newDestination != null) {
+                LOG.debug("Updated Connection {} from its temporary 
destination to its correct destination of {}", connection, newDestination);
+                connection.setDestination(newDestination);
+            }
+        }
+    }
+
+    private void removeTemporaryFunnel(final ProcessGroup group) {
+        final String tempFunnelId = group.getIdentifier() + 
TEMP_FUNNEL_ID_SUFFIX;
+        final Funnel temporaryFunnel = 
context.getFlowManager().getFunnel(tempFunnelId);
+        if (temporaryFunnel == null) {
+            LOG.debug("No temporary funnel to remove for {}", group);
+            return;
+        }
+
+        if (temporaryFunnel.getIncomingConnections().isEmpty()) {
+            LOG.debug("Updated all temporary connections for {}. Removing 
Temporary funnel from flow", group);
+            group.removeFunnel(temporaryFunnel);
+        } else {
+            LOG.warn("The temporary funnel {} for {} still has {} connections. 
It cannot be removed.", temporaryFunnel, group, 
temporaryFunnel.getIncomingConnections().size());
+        }
+    }
+
+    private <T extends Connectable> Map<String, T> componentsById(final 
ProcessGroup group, final Function<ProcessGroup, Collection<T>> 
retrieveComponents) {
+        return retrieveComponents.apply(group).stream()
+            .collect(Collectors.toMap(component -> 
component.getVersionedComponentId().orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())),
 Function.identity()));
+    }
+
+
+    private <T> Map<String, T> componentsById(final ProcessGroup group, final 
Function<ProcessGroup, Collection<T>> retrieveComponents,
+                                              final Function<T, String> 
retrieveId, final Function<T, Optional<String>> retrieveVersionedComponentId) {
+
+        return retrieveComponents.apply(group).stream()
+            .collect(Collectors.toMap(component -> 
retrieveVersionedComponentId.apply(component).orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(retrieveId.apply(component))),
 Function.identity()));
+    }
+
+
+    private void synchronizeFunnels(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) 
{
+        for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
+            final Funnel funnel = 
funnelsByVersionedId.get(proposedFunnel.getIdentifier());
+            if (funnel == null) {
+                final Funnel added = addFunnel(group, proposedFunnel, 
context.getComponentIdGenerator());
+                context.getFlowManager().onFunnelAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
+                updateFunnel(funnel, proposedFunnel);
+                LOG.info("Updated {}", funnel);
+            } else {
+                funnel.setPosition(new 
Position(proposedFunnel.getPosition().getX(), 
proposedFunnel.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeInputPorts(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
+                                                              final 
Map<String, Port> inputPortsByVersionedId) {
+        for (final VersionedPort proposedPort : proposed.getInputPorts()) {
+            final Port port = 
inputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final String temporaryName = 
generateTemporaryPortName(proposedPort);
+                final Port added = addInputPort(group, proposedPort, 
context.getComponentIdGenerator(), temporaryName);
+                proposedPortFinalNames.put(added, proposedPort.getName());
+                context.getFlowManager().onInputPortAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                final String temporaryName = 
generateTemporaryPortName(proposedPort);
+                proposedPortFinalNames.put(port, proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new 
Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeOutputPorts(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
+                                        final Map<String, Port> 
outputPortsByVersionedId) {
+
+        for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
+            final Port port = 
outputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final String temporaryName = 
generateTemporaryPortName(proposedPort);
+                final Port added = addOutputPort(group, proposedPort, 
context.getComponentIdGenerator(), temporaryName);
+                proposedPortFinalNames.put(added, proposedPort.getName());
+                context.getFlowManager().onOutputPortAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                final String temporaryName = 
generateTemporaryPortName(proposedPort);
+                proposedPortFinalNames.put(port, proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new 
Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+        }
+    }
+
+    private void updatePortsToFinalNames(final Map<Port, String> 
proposedPortFinalNames) {
+        // Now that all input/output ports have been removed, we should be 
able to update
+        // all ports to the final name that was proposed in the new flow 
version.
+        for (final Map.Entry<Port, String> portAndFinalName : 
proposedPortFinalNames.entrySet()) {
+            final Port port = portAndFinalName.getKey();
+            final String finalName = portAndFinalName.getValue();
+            LOG.info("Updating {} to replace temporary name with final name", 
port);
+
+            // For public ports we need to consider if another public port 
exists somewhere else in the flow with the
+            // same name, and if so then rename the incoming port so the flow 
can still be imported
+            if (port instanceof PublicPort) {
+                final PublicPort publicPort = (PublicPort) port;
+                final String publicPortFinalName = 
getPublicPortFinalName(publicPort, finalName);
+                updatePortToSetFinalName(publicPort, publicPortFinalName);
+            } else {
+                updatePortToSetFinalName(port, finalName);
+            }
+        }
+    }
+
+    private void synchronizeLabels(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Label> labelsByVersionedId) {
+        for (final VersionedLabel proposedLabel : proposed.getLabels()) {
+            final Label label = 
labelsByVersionedId.get(proposedLabel.getIdentifier());
+            if (label == null) {
+                final Label added = addLabel(group, proposedLabel, 
context.getComponentIdGenerator());
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) {
+                updateLabel(label, proposedLabel);
+                LOG.info("Updated {}", label);
+            } else {
+                label.setPosition(new 
Position(proposedLabel.getPosition().getX(), 
proposedLabel.getPosition().getY()));
+            }
+        }
+    }
+
+    private void removeMissingProcessors(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, ProcessorNode> 
processorsByVersionedId) {
+        removeMissingComponents(group, proposed, processorsByVersionedId, 
VersionedProcessGroup::getProcessors, ProcessGroup::removeProcessor);
+    }
+
+    private void removeMissingInputPorts(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Port> portsByVersionedId) {
+        removeMissingComponents(group, proposed, portsByVersionedId, 
VersionedProcessGroup::getInputPorts, ProcessGroup::removeInputPort);
+    }
+
+    private void removeMissingOutputPorts(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Port> portsByVersionedId) {
+        removeMissingComponents(group, proposed, portsByVersionedId, 
VersionedProcessGroup::getOutputPorts, ProcessGroup::removeOutputPort);
+    }
+
+    private void removeMissingLabels(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Label> labelsByVersionedId) {
+        removeMissingComponents(group, proposed, labelsByVersionedId, 
VersionedProcessGroup::getLabels, ProcessGroup::removeLabel);
+    }
+
+    private void removeMissingFunnels(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) 
{
+        removeMissingComponents(group, proposed, funnelsByVersionedId, 
VersionedProcessGroup::getFunnels, (removalGroup, funnelToRemove) -> {
+            // Skip our temporary funnel
+            if 
(funnelToRemove.getIdentifier().equals(removalGroup.getIdentifier() + 
TEMP_FUNNEL_ID_SUFFIX)) {
+                return;
+            }
+
+            removalGroup.removeFunnel(funnelToRemove);
+        });
+    }
+
+    private void removeMissingRpg(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> 
rpgsByVersionedId) {
+        removeMissingComponents(group, proposed, rpgsByVersionedId, 
VersionedProcessGroup::getRemoteProcessGroups, 
ProcessGroup::removeRemoteProcessGroup);
+    }
+
+    private void removeMissingControllerServices(final ProcessGroup group, 
final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> 
servicesByVersionedId) {
+        final BiConsumer<ProcessGroup, ControllerServiceNode> componentRemoval 
= (grp, service) -> 
context.getControllerServiceProvider().removeControllerService(service);
+        removeMissingComponents(group, proposed, servicesByVersionedId, 
VersionedProcessGroup::getControllerServices, componentRemoval);
+    }
+
+    private void removeMissingChildGroups(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, ProcessGroup> 
groupsByVersionedId) {
+        removeMissingComponents(group, proposed, groupsByVersionedId, 
VersionedProcessGroup::getProcessGroups,
+            (procGroup, childGroup) -> {
+                // We cannot remove a Process Group unless it is empty. At 
this point, we've already removed
+                // all Processors, Input Ports, etc. that are no longer 
needed. However, we have not removed all
+                // Process Groups. We may have a situation where we have 
nested Process Groups, each one consisting
+                // now of only other Process Groups that can be removed, such 
as A -> B -> C -> D.
+                // Each of these is a Process Group that contains only other 
(otherwise empty) process groups.
+                // To accomplish this, we need to use a depth-first approach, 
removing the inner-most group (D),
+                // then C, then B, and finally A.
+                if (!childGroup.isEmpty()) {
+                    purgeChildGroupOfEmptyChildren(childGroup);
+                }
+
+                procGroup.removeProcessGroup(childGroup);
+            });
+    }
+
+    private void purgeChildGroupOfEmptyChildren(final ProcessGroup group) {
+        for (final ProcessGroup child : group.getProcessGroups()) {
+            purgeChildGroupOfEmptyChildren(child);
+
+            if (child.isEmpty()) {
+                group.removeProcessGroup(child);
+            }
+        }
+    }
+
+    private <C, V extends VersionedComponent> void 
removeMissingComponents(final ProcessGroup group, final VersionedProcessGroup 
proposed, final Map<String, C> componentsById,
+                                             final 
Function<VersionedProcessGroup, Collection<V>> getVersionedComponents, final 
BiConsumer<ProcessGroup, C> removeComponent) {
+
+        // Determine the ID's of the components to remove. To do this, we get 
the ID's of all components in the Process Group,
+        // and then remove from that the ID's of the components in the 
proposed group. That leaves us with the ID's of components
+        // that exist currently that are not in the proposed flow.
+        final Set<String> idsOfComponentsToRemove = new 
HashSet<>(componentsById.keySet());
+        for (final V versionedComponent : 
getVersionedComponents.apply(proposed)) {
+            idsOfComponentsToRemove.remove(versionedComponent.getIdentifier());
+        }
+
+        // Remove any of those components
+        for (final String idToRemove : idsOfComponentsToRemove) {
+            final C toRemove = componentsById.get(idToRemove);
+            LOG.info("Removing {} from {}", toRemove, group);
+            removeComponent.accept(group, toRemove);
+        }
+    }
+
+
+    private void synchronizeProcessors(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<ProcessorNode, Set<Relationship>> 
autoTerminatedRelationships,
+                                                                       final 
Map<String, ProcessorNode> processorsByVersionedId) throws 
ProcessorInstantiationException {
+        for (final VersionedProcessor proposedProcessor : 
proposed.getProcessors()) {
+            final ProcessorNode processor = 
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+            if (processor == null) {
+                final ProcessorNode added = addProcessor(group, 
proposedProcessor, context.getComponentIdGenerator());
+                context.getFlowManager().onProcessorAdded(added);
+
+                final Set<Relationship> proposedAutoTerminated =
+                    proposedProcessor.getAutoTerminatedRelationships() == null 
? Collections.emptySet() : 
proposedProcessor.getAutoTerminatedRelationships().stream()
+                        .map(added::getRelationship)
+                        .collect(Collectors.toSet());
+                autoTerminatedRelationships.put(added, proposedAutoTerminated);
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+                updateProcessor(processor, proposedProcessor);
+
+                final Set<Relationship> proposedAutoTerminated =
+                    proposedProcessor.getAutoTerminatedRelationships() == null 
? Collections.emptySet() : 
proposedProcessor.getAutoTerminatedRelationships().stream()
+                        .map(processor::getRelationship)
+                        .collect(Collectors.toSet());
+
+                if 
(!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
+                    autoTerminatedRelationships.put(processor, 
proposedAutoTerminated);
+                }
+
+                LOG.info("Updated {}", processor);
+            } else {
+                processor.setPosition(new 
Position(proposedProcessor.getPosition().getX(), 
proposedProcessor.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeRemoteGroups(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> 
rpgsByVersionedId) {
+        for (final VersionedRemoteProcessGroup proposedRpg : 
proposed.getRemoteProcessGroups()) {
+            final RemoteProcessGroup rpg = 
rpgsByVersionedId.get(proposedRpg.getIdentifier());
+            if (rpg == null) {
+                final RemoteProcessGroup added = addRemoteProcessGroup(group, 
proposedRpg, context.getComponentIdGenerator());
+                LOG.info("Added {} to {}", added, group);
+            } else if 
(updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
+                updateRemoteProcessGroup(rpg, proposedRpg, 
context.getComponentIdGenerator());
+                LOG.info("Updated {}", rpg);
+            } else {
+                rpg.setPosition(new Position(proposedRpg.getPosition().getX(), 
proposedRpg.getPosition().getY()));
+            }
+        }
+    }
+
+    @Override
+    public void verifyCanSynchronize(final ProcessGroup group, final 
VersionedProcessGroup flowContents, final boolean verifyConnectionRemoval) {
+        // Ensure no deleted child process groups contain templates and 
optionally no deleted connections contain data
+        // in their queue. Note that this check enforces ancestry among the 
group components to avoid a scenario where
+        // a component is matched by id, but it does not exist in the same 
hierarchy and thus will be removed and
+        // re-added when the update is performed
+        verifyCanRemoveMissingComponents(group, flowContents, 
verifyConnectionRemoval);
+
+        // Determine which input ports were removed from this process group
+        final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
+        group.getInputPorts()
+            .forEach(port -> 
removedInputPortsByVersionId.put(port.getVersionedComponentId().orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), 
port));
+
+        flowContents.getInputPorts().stream()
+            .map(VersionedPort::getIdentifier)
+            .forEach(removedInputPortsByVersionId::remove);
+
+        // Ensure that there are no incoming connections for any Input Port 
that was removed.
+        for (final Port inputPort : removedInputPortsByVersionId.values()) {
+            final List<Connection> incomingConnections = 
inputPort.getIncomingConnections();
+            if (!incomingConnections.isEmpty()) {
+                throw new IllegalStateException(group + " cannot be updated to 
the proposed flow because the proposed flow "
+                    + "does not contain the Input Port " + inputPort + " and 
the Input Port currently has an incoming connection");
+            }
+        }
+
+        // Determine which output ports were removed from this process group
+        final Map<String, Port> removedOutputPortsByVersionId = new 
HashMap<>();
+        group.getOutputPorts()
+            .forEach(port -> 
removedOutputPortsByVersionId.put(port.getVersionedComponentId().orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), 
port));
+
+        flowContents.getOutputPorts().stream()
+            .map(VersionedPort::getIdentifier)
+            .forEach(removedOutputPortsByVersionId::remove);
+
+        // Ensure that there are no outgoing connections for any Output Port 
that was removed.
+        for (final Port outputPort : removedOutputPortsByVersionId.values()) {
+            final Set<Connection> outgoingConnections = 
outputPort.getConnections();
+            if (!outgoingConnections.isEmpty()) {
+                throw new IllegalStateException(group + " cannot be updated to 
the proposed flow because the proposed flow "
+                    + "does not contain the Output Port " + outputPort + " and 
the Output Port currently has an outgoing connection");
+            }
+        }
+
+        // Ensure that all Processors are instantiable
+        final Map<String, VersionedProcessor> proposedProcessors = new 
HashMap<>();
+        findAllProcessors(flowContents, proposedProcessors);
+
+        group.findAllProcessors()
+            .forEach(proc -> 
proposedProcessors.remove(proc.getVersionedComponentId().orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier()))));
+
+        for (final VersionedProcessor processorToAdd : 
proposedProcessors.values()) {
+            final String processorToAddClass = processorToAdd.getType();
+            final BundleCoordinate processorToAddCoordinate = 
toCoordinate(processorToAdd.getBundle());
+
+            // Get the exact bundle requested, if it exists.
+            final Bundle bundle = processorToAdd.getBundle();
+            final BundleCoordinate coordinate = new 
BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+            final org.apache.nifi.bundle.Bundle resolved = 
context.getExtensionManager().getBundle(coordinate);
+
+            if (resolved == null) {
+                // Could not resolve the bundle explicitly. Check for possible 
bundles.
+                final List<org.apache.nifi.bundle.Bundle> possibleBundles = 
context.getExtensionManager().getBundles(processorToAddClass);
+                final boolean bundleExists = possibleBundles.stream()
+                    .anyMatch(b -> 
processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists && possibleBundles.size() != 1) {
+                    LOG.warn("Unknown bundle {} for processor type {} - will 
use Ghosted component instead", processorToAddCoordinate, processorToAddClass);
+                }
+            }
+        }
+
+        // Ensure that all Controller Services are instantiable
+        final Map<String, VersionedControllerService> proposedServices = new 
HashMap<>();
+        findAllControllerServices(flowContents, proposedServices);
+
+        group.findAllControllerServices()
+            .forEach(service -> 
proposedServices.remove(service.getVersionedComponentId().orElse(
+                
NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier()))));
+
+        for (final VersionedControllerService serviceToAdd : 
proposedServices.values()) {
+            final String serviceToAddClass = serviceToAdd.getType();
+            final BundleCoordinate serviceToAddCoordinate = 
toCoordinate(serviceToAdd.getBundle());
+
+            final org.apache.nifi.bundle.Bundle resolved = 
context.getExtensionManager().getBundle(serviceToAddCoordinate);
+            if (resolved == null) {
+                final List<org.apache.nifi.bundle.Bundle> possibleBundles = 
context.getExtensionManager().getBundles(serviceToAddClass);
+                final boolean bundleExists = possibleBundles.stream()
+                    .anyMatch(b -> 
serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists && possibleBundles.size() != 1) {
+                    LOG.warn("Unknown bundle {} for processor type {} - will 
use Ghosted component instead", serviceToAddCoordinate, serviceToAddClass);
+//                    throw new IllegalArgumentException("Unknown bundle " + 
serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass);
+                }
+            }
+        }
+
+        // Ensure that all Prioritizers are instantiate-able and that any load 
balancing configuration is correct

Review comment:
       "instantiate-able"? I must have been tired :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to