This is an automated email from the ASF dual-hosted git repository.

bsimon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fed175d46f NIFI-11189: When synchronizing a ProcessGroup to match a 
VersionedProcessGroup, do not remove the temporary funnel until the very end. 
This is important if the temporary funnel already exists in the flow on startup
fed175d46f is described below

commit fed175d46f794cffae8695df6eb63adab4e8a27a
Author: Mark Payne <[email protected]>
AuthorDate: Wed Feb 15 18:03:10 2023 -0500

    NIFI-11189: When synchronizing a ProcessGroup to match a 
VersionedProcessGroup, do not remove the temporary funnel until the very end. 
This is important if the temporary funnel already exists in the flow on startup
    
    Signed-off-by: Bence Simon <[email protected]>
    This closes #6963
---
 .../StandardVersionedComponentSynchronizer.java    | 134 +++++++++++----------
 1 file changed, 69 insertions(+), 65 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 3401aa5acd..f7e64336ee 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -395,76 +395,80 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         final Set<String> connectionsWithTempDestination = 
updateConnectionDestinations(group, proposed, connectionsByVersionedId);
 
         try {
-            final Map<String, Funnel> funnelsByVersionedId = 
componentsById(group, ProcessGroup::getFunnels);
-            final Map<String, ProcessorNode> processorsByVersionedId = 
componentsById(group, ProcessGroup::getProcessors);
-            final Map<String, Port> inputPortsByVersionedId = 
componentsById(group, ProcessGroup::getInputPorts);
-            final Map<String, Port> outputPortsByVersionedId = 
componentsById(group, ProcessGroup::getOutputPorts);
-            final Map<String, Label> labelsByVersionedId = 
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, 
Label::getVersionedComponentId);
-            final Map<String, RemoteProcessGroup> rpgsByVersionedId = 
componentsById(group, ProcessGroup::getRemoteProcessGroups,
-                RemoteProcessGroup::getIdentifier, 
RemoteProcessGroup::getVersionedComponentId);
-            final Map<String, ProcessGroup> childGroupsByVersionedId = 
componentsById(group, ProcessGroup::getProcessGroups, 
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
-
-            removeMissingProcessors(group, proposed, processorsByVersionedId);
-            removeMissingFunnels(group, proposed, funnelsByVersionedId);
-            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
-            removeMissingOutputPorts(group, proposed, 
outputPortsByVersionedId);
-            removeMissingLabels(group, proposed, labelsByVersionedId);
-            removeMissingRpg(group, proposed, rpgsByVersionedId);
-            removeMissingChildGroups(group, proposed, 
childGroupsByVersionedId);
-
-            // Synchronize Child Process Groups
-            synchronizeChildGroups(group, proposed, 
versionedParameterContexts, childGroupsByVersionedId, 
parameterProviderReferences, topLevelGroup);
-
-            synchronizeFunnels(group, proposed, funnelsByVersionedId);
-            synchronizeInputPorts(group, proposed, proposedPortFinalNames, 
inputPortsByVersionedId);
-            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, 
outputPortsByVersionedId);
-            synchronizeLabels(group, proposed, labelsByVersionedId);
-            synchronizeProcessors(group, proposed, processorsByVersionedId, 
topLevelGroup);
-            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
-        } finally {
-            // Make sure that we reset the connections
-            restoreConnectionDestinations(group, proposed, 
connectionsByVersionedId, connectionsWithTempDestination);
-            removeTemporaryFunnel(group);
-        }
-
-        Map<String, Parameter> newParameters = new HashMap<>();
-        if (!proposedParameterContextExistsBeforeSynchronize && 
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
 {
-            Map<String, String> controllerServiceVersionedIdToId = 
group.getControllerServices(false)
-                .stream()
-                .filter(controllerServiceNode -> 
controllerServiceNode.getVersionedComponentId().isPresent())
-                .collect(Collectors.toMap(
-                    controllerServiceNode -> 
controllerServiceNode.getVersionedComponentId().get(),
-                    ComponentNode::getIdentifier
-                ));
-
-            ParameterContext parameterContext = group.getParameterContext();
-
-            if (parameterContext != null) {
-                parameterContext.getParameters().forEach((descriptor, 
parameter) -> {
-                    List<ParameterReferencedControllerServiceData> 
referencedControllerServiceData = parameterContext
-                        .getParameterReferenceManager()
-                        .getReferencedControllerServiceData(parameterContext, 
descriptor.getName());
-
-                    if (referencedControllerServiceData.isEmpty()) {
-                        newParameters.put(descriptor.getName(), parameter);
-                    } else {
-                        final Parameter adjustedParameter = new 
Parameter(parameter.getDescriptor(), 
controllerServiceVersionedIdToId.get(parameter.getValue()));
-                        newParameters.put(descriptor.getName(), 
adjustedParameter);
-                    }
-                });
+            try {
+                final Map<String, Funnel> funnelsByVersionedId = 
componentsById(group, ProcessGroup::getFunnels);
+                final Map<String, ProcessorNode> processorsByVersionedId = 
componentsById(group, ProcessGroup::getProcessors);
+                final Map<String, Port> inputPortsByVersionedId = 
componentsById(group, ProcessGroup::getInputPorts);
+                final Map<String, Port> outputPortsByVersionedId = 
componentsById(group, ProcessGroup::getOutputPorts);
+                final Map<String, Label> labelsByVersionedId = 
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, 
Label::getVersionedComponentId);
+                final Map<String, RemoteProcessGroup> rpgsByVersionedId = 
componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                    RemoteProcessGroup::getIdentifier, 
RemoteProcessGroup::getVersionedComponentId);
+                final Map<String, ProcessGroup> childGroupsByVersionedId = 
componentsById(group, ProcessGroup::getProcessGroups, 
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+                removeMissingProcessors(group, proposed, 
processorsByVersionedId);
+                removeMissingFunnels(group, proposed, funnelsByVersionedId);
+                removeMissingInputPorts(group, proposed, 
inputPortsByVersionedId);
+                removeMissingOutputPorts(group, proposed, 
outputPortsByVersionedId);
+                removeMissingLabels(group, proposed, labelsByVersionedId);
+                removeMissingRpg(group, proposed, rpgsByVersionedId);
+                removeMissingChildGroups(group, proposed, 
childGroupsByVersionedId);
+
+                // Synchronize Child Process Groups
+                synchronizeChildGroups(group, proposed, 
versionedParameterContexts, childGroupsByVersionedId, 
parameterProviderReferences, topLevelGroup);
+
+                synchronizeFunnels(group, proposed, funnelsByVersionedId);
+                synchronizeInputPorts(group, proposed, proposedPortFinalNames, 
inputPortsByVersionedId);
+                synchronizeOutputPorts(group, proposed, 
proposedPortFinalNames, outputPortsByVersionedId);
+                synchronizeLabels(group, proposed, labelsByVersionedId);
+                synchronizeProcessors(group, proposed, 
processorsByVersionedId, topLevelGroup);
+                synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+            } finally {
+                // Make sure that we reset the connections
+                restoreConnectionDestinations(group, proposed, 
connectionsByVersionedId, connectionsWithTempDestination);
+            }
+
+            Map<String, Parameter> newParameters = new HashMap<>();
+            if (!proposedParameterContextExistsBeforeSynchronize && 
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
 {
+                Map<String, String> controllerServiceVersionedIdToId = 
group.getControllerServices(false)
+                    .stream()
+                    .filter(controllerServiceNode -> 
controllerServiceNode.getVersionedComponentId().isPresent())
+                    .collect(Collectors.toMap(
+                        controllerServiceNode -> 
controllerServiceNode.getVersionedComponentId().get(),
+                        ComponentNode::getIdentifier
+                    ));
+
+                ParameterContext parameterContext = 
group.getParameterContext();
+
+                if (parameterContext != null) {
+                    parameterContext.getParameters().forEach((descriptor, 
parameter) -> {
+                        List<ParameterReferencedControllerServiceData> 
referencedControllerServiceData = parameterContext
+                            .getParameterReferenceManager()
+                            
.getReferencedControllerServiceData(parameterContext, descriptor.getName());
+
+                        if (referencedControllerServiceData.isEmpty()) {
+                            newParameters.put(descriptor.getName(), parameter);
+                        } else {
+                            final Parameter adjustedParameter = new 
Parameter(parameter.getDescriptor(), 
controllerServiceVersionedIdToId.get(parameter.getValue()));
+                            newParameters.put(descriptor.getName(), 
adjustedParameter);
+                        }
+                    });
 
-                parameterContext.setParameters(newParameters);
+                    parameterContext.setParameters(newParameters);
+                }
             }
-        }
 
-        // We can now add in any necessary connections, since all connectable 
components have now been created.
-        synchronizeConnections(group, proposed, connectionsByVersionedId);
+            // We can now add in any necessary connections, since all 
connectable components have now been created.
+            synchronizeConnections(group, proposed, connectionsByVersionedId);
 
-        // All ports have now been added/removed as necessary. We can now 
resolve the port names.
-        updatePortsToFinalNames(proposedPortFinalNames);
+            // All ports have now been added/removed as necessary. We can now 
resolve the port names.
+            updatePortsToFinalNames(proposedPortFinalNames);
 
-        // Start all components that are queued up to be started now
-        context.getComponentScheduler().resume();
+            // Start all components that are queued up to be started now
+            context.getComponentScheduler().resume();
+        } finally {
+            // If we created a temporary funnel, remove it if there's no 
longer anything pointing to it.
+            removeTemporaryFunnel(group);
+        }
     }
 
     private String determineRegistryId(final VersionedFlowCoordinates 
coordinates) {

Reply via email to