This is an automated email from the ASF dual-hosted git repository.
bsimon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fed175d46f NIFI-11189: When synchronizing a ProcessGroup to match a
VersionedProcessGroup, do not remove the temporary funnel until the very end.
This is important if the temporary funnel already exists in the flow on startup
fed175d46f is described below
commit fed175d46f794cffae8695df6eb63adab4e8a27a
Author: Mark Payne <[email protected]>
AuthorDate: Wed Feb 15 18:03:10 2023 -0500
NIFI-11189: When synchronizing a ProcessGroup to match a
VersionedProcessGroup, do not remove the temporary funnel until the very end.
This is important if the temporary funnel already exists in the flow on startup
Signed-off-by: Bence Simon <[email protected]>
This closes #6963
---
.../StandardVersionedComponentSynchronizer.java | 134 +++++++++++----------
1 file changed, 69 insertions(+), 65 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 3401aa5acd..f7e64336ee 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -395,76 +395,80 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
final Set<String> connectionsWithTempDestination =
updateConnectionDestinations(group, proposed, connectionsByVersionedId);
try {
- final Map<String, Funnel> funnelsByVersionedId =
componentsById(group, ProcessGroup::getFunnels);
- final Map<String, ProcessorNode> processorsByVersionedId =
componentsById(group, ProcessGroup::getProcessors);
- final Map<String, Port> inputPortsByVersionedId =
componentsById(group, ProcessGroup::getInputPorts);
- final Map<String, Port> outputPortsByVersionedId =
componentsById(group, ProcessGroup::getOutputPorts);
- final Map<String, Label> labelsByVersionedId =
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier,
Label::getVersionedComponentId);
- final Map<String, RemoteProcessGroup> rpgsByVersionedId =
componentsById(group, ProcessGroup::getRemoteProcessGroups,
- RemoteProcessGroup::getIdentifier,
RemoteProcessGroup::getVersionedComponentId);
- final Map<String, ProcessGroup> childGroupsByVersionedId =
componentsById(group, ProcessGroup::getProcessGroups,
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
-
- removeMissingProcessors(group, proposed, processorsByVersionedId);
- removeMissingFunnels(group, proposed, funnelsByVersionedId);
- removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
- removeMissingOutputPorts(group, proposed,
outputPortsByVersionedId);
- removeMissingLabels(group, proposed, labelsByVersionedId);
- removeMissingRpg(group, proposed, rpgsByVersionedId);
- removeMissingChildGroups(group, proposed,
childGroupsByVersionedId);
-
- // Synchronize Child Process Groups
- synchronizeChildGroups(group, proposed,
versionedParameterContexts, childGroupsByVersionedId,
parameterProviderReferences, topLevelGroup);
-
- synchronizeFunnels(group, proposed, funnelsByVersionedId);
- synchronizeInputPorts(group, proposed, proposedPortFinalNames,
inputPortsByVersionedId);
- synchronizeOutputPorts(group, proposed, proposedPortFinalNames,
outputPortsByVersionedId);
- synchronizeLabels(group, proposed, labelsByVersionedId);
- synchronizeProcessors(group, proposed, processorsByVersionedId,
topLevelGroup);
- synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
- } finally {
- // Make sure that we reset the connections
- restoreConnectionDestinations(group, proposed,
connectionsByVersionedId, connectionsWithTempDestination);
- removeTemporaryFunnel(group);
- }
-
- Map<String, Parameter> newParameters = new HashMap<>();
- if (!proposedParameterContextExistsBeforeSynchronize &&
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
{
- Map<String, String> controllerServiceVersionedIdToId =
group.getControllerServices(false)
- .stream()
- .filter(controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().isPresent())
- .collect(Collectors.toMap(
- controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().get(),
- ComponentNode::getIdentifier
- ));
-
- ParameterContext parameterContext = group.getParameterContext();
-
- if (parameterContext != null) {
- parameterContext.getParameters().forEach((descriptor,
parameter) -> {
- List<ParameterReferencedControllerServiceData>
referencedControllerServiceData = parameterContext
- .getParameterReferenceManager()
- .getReferencedControllerServiceData(parameterContext,
descriptor.getName());
-
- if (referencedControllerServiceData.isEmpty()) {
- newParameters.put(descriptor.getName(), parameter);
- } else {
- final Parameter adjustedParameter = new
Parameter(parameter.getDescriptor(),
controllerServiceVersionedIdToId.get(parameter.getValue()));
- newParameters.put(descriptor.getName(),
adjustedParameter);
- }
- });
+ try {
+ final Map<String, Funnel> funnelsByVersionedId =
componentsById(group, ProcessGroup::getFunnels);
+ final Map<String, ProcessorNode> processorsByVersionedId =
componentsById(group, ProcessGroup::getProcessors);
+ final Map<String, Port> inputPortsByVersionedId =
componentsById(group, ProcessGroup::getInputPorts);
+ final Map<String, Port> outputPortsByVersionedId =
componentsById(group, ProcessGroup::getOutputPorts);
+ final Map<String, Label> labelsByVersionedId =
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier,
Label::getVersionedComponentId);
+ final Map<String, RemoteProcessGroup> rpgsByVersionedId =
componentsById(group, ProcessGroup::getRemoteProcessGroups,
+ RemoteProcessGroup::getIdentifier,
RemoteProcessGroup::getVersionedComponentId);
+ final Map<String, ProcessGroup> childGroupsByVersionedId =
componentsById(group, ProcessGroup::getProcessGroups,
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+ removeMissingProcessors(group, proposed,
processorsByVersionedId);
+ removeMissingFunnels(group, proposed, funnelsByVersionedId);
+ removeMissingInputPorts(group, proposed,
inputPortsByVersionedId);
+ removeMissingOutputPorts(group, proposed,
outputPortsByVersionedId);
+ removeMissingLabels(group, proposed, labelsByVersionedId);
+ removeMissingRpg(group, proposed, rpgsByVersionedId);
+ removeMissingChildGroups(group, proposed,
childGroupsByVersionedId);
+
+ // Synchronize Child Process Groups
+ synchronizeChildGroups(group, proposed,
versionedParameterContexts, childGroupsByVersionedId,
parameterProviderReferences, topLevelGroup);
+
+ synchronizeFunnels(group, proposed, funnelsByVersionedId);
+ synchronizeInputPorts(group, proposed, proposedPortFinalNames,
inputPortsByVersionedId);
+ synchronizeOutputPorts(group, proposed,
proposedPortFinalNames, outputPortsByVersionedId);
+ synchronizeLabels(group, proposed, labelsByVersionedId);
+ synchronizeProcessors(group, proposed,
processorsByVersionedId, topLevelGroup);
+ synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+ } finally {
+ // Make sure that we reset the connections
+ restoreConnectionDestinations(group, proposed,
connectionsByVersionedId, connectionsWithTempDestination);
+ }
+
+ Map<String, Parameter> newParameters = new HashMap<>();
+ if (!proposedParameterContextExistsBeforeSynchronize &&
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
{
+ Map<String, String> controllerServiceVersionedIdToId =
group.getControllerServices(false)
+ .stream()
+ .filter(controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().isPresent())
+ .collect(Collectors.toMap(
+ controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().get(),
+ ComponentNode::getIdentifier
+ ));
+
+ ParameterContext parameterContext =
group.getParameterContext();
+
+ if (parameterContext != null) {
+ parameterContext.getParameters().forEach((descriptor,
parameter) -> {
+ List<ParameterReferencedControllerServiceData>
referencedControllerServiceData = parameterContext
+ .getParameterReferenceManager()
+
.getReferencedControllerServiceData(parameterContext, descriptor.getName());
+
+ if (referencedControllerServiceData.isEmpty()) {
+ newParameters.put(descriptor.getName(), parameter);
+ } else {
+ final Parameter adjustedParameter = new
Parameter(parameter.getDescriptor(),
controllerServiceVersionedIdToId.get(parameter.getValue()));
+ newParameters.put(descriptor.getName(),
adjustedParameter);
+ }
+ });
- parameterContext.setParameters(newParameters);
+ parameterContext.setParameters(newParameters);
+ }
}
- }
- // We can now add in any necessary connections, since all connectable
components have now been created.
- synchronizeConnections(group, proposed, connectionsByVersionedId);
+ // We can now add in any necessary connections, since all
connectable components have now been created.
+ synchronizeConnections(group, proposed, connectionsByVersionedId);
- // All ports have now been added/removed as necessary. We can now
resolve the port names.
- updatePortsToFinalNames(proposedPortFinalNames);
+ // All ports have now been added/removed as necessary. We can now
resolve the port names.
+ updatePortsToFinalNames(proposedPortFinalNames);
- // Start all components that are queued up to be started now
- context.getComponentScheduler().resume();
+ // Start all components that are queued up to be started now
+ context.getComponentScheduler().resume();
+ } finally {
+ // If we created a temporary funnel, remove it if there's no
longer anything pointing to it.
+ removeTemporaryFunnel(group);
+ }
}
private String determineRegistryId(final VersionedFlowCoordinates
coordinates) {