simonbence commented on code in PR #6963:
URL: https://github.com/apache/nifi/pull/6963#discussion_r1108431564
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -395,76 +395,80 @@ private void synchronize(final ProcessGroup group, final
VersionedProcessGroup p
final Set<String> connectionsWithTempDestination =
updateConnectionDestinations(group, proposed, connectionsByVersionedId);
try {
- final Map<String, Funnel> funnelsByVersionedId =
componentsById(group, ProcessGroup::getFunnels);
- final Map<String, ProcessorNode> processorsByVersionedId =
componentsById(group, ProcessGroup::getProcessors);
- final Map<String, Port> inputPortsByVersionedId =
componentsById(group, ProcessGroup::getInputPorts);
- final Map<String, Port> outputPortsByVersionedId =
componentsById(group, ProcessGroup::getOutputPorts);
- final Map<String, Label> labelsByVersionedId =
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier,
Label::getVersionedComponentId);
- final Map<String, RemoteProcessGroup> rpgsByVersionedId =
componentsById(group, ProcessGroup::getRemoteProcessGroups,
- RemoteProcessGroup::getIdentifier,
RemoteProcessGroup::getVersionedComponentId);
- final Map<String, ProcessGroup> childGroupsByVersionedId =
componentsById(group, ProcessGroup::getProcessGroups,
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
-
- removeMissingProcessors(group, proposed, processorsByVersionedId);
- removeMissingFunnels(group, proposed, funnelsByVersionedId);
- removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
- removeMissingOutputPorts(group, proposed,
outputPortsByVersionedId);
- removeMissingLabels(group, proposed, labelsByVersionedId);
- removeMissingRpg(group, proposed, rpgsByVersionedId);
- removeMissingChildGroups(group, proposed,
childGroupsByVersionedId);
-
- // Synchronize Child Process Groups
- synchronizeChildGroups(group, proposed,
versionedParameterContexts, childGroupsByVersionedId,
parameterProviderReferences, topLevelGroup);
-
- synchronizeFunnels(group, proposed, funnelsByVersionedId);
- synchronizeInputPorts(group, proposed, proposedPortFinalNames,
inputPortsByVersionedId);
- synchronizeOutputPorts(group, proposed, proposedPortFinalNames,
outputPortsByVersionedId);
- synchronizeLabels(group, proposed, labelsByVersionedId);
- synchronizeProcessors(group, proposed, processorsByVersionedId,
topLevelGroup);
- synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
- } finally {
- // Make sure that we reset the connections
- restoreConnectionDestinations(group, proposed,
connectionsByVersionedId, connectionsWithTempDestination);
- removeTemporaryFunnel(group);
- }
-
- Map<String, Parameter> newParameters = new HashMap<>();
- if (!proposedParameterContextExistsBeforeSynchronize &&
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
{
- Map<String, String> controllerServiceVersionedIdToId =
group.getControllerServices(false)
- .stream()
- .filter(controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().isPresent())
- .collect(Collectors.toMap(
- controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().get(),
- ComponentNode::getIdentifier
- ));
-
- ParameterContext parameterContext = group.getParameterContext();
-
- if (parameterContext != null) {
- parameterContext.getParameters().forEach((descriptor,
parameter) -> {
- List<ParameterReferencedControllerServiceData>
referencedControllerServiceData = parameterContext
- .getParameterReferenceManager()
- .getReferencedControllerServiceData(parameterContext,
descriptor.getName());
-
- if (referencedControllerServiceData.isEmpty()) {
- newParameters.put(descriptor.getName(), parameter);
- } else {
- final Parameter adjustedParameter = new
Parameter(parameter.getDescriptor(),
controllerServiceVersionedIdToId.get(parameter.getValue()));
- newParameters.put(descriptor.getName(),
adjustedParameter);
- }
- });
+ try {
+ final Map<String, Funnel> funnelsByVersionedId =
componentsById(group, ProcessGroup::getFunnels);
+ final Map<String, ProcessorNode> processorsByVersionedId =
componentsById(group, ProcessGroup::getProcessors);
+ final Map<String, Port> inputPortsByVersionedId =
componentsById(group, ProcessGroup::getInputPorts);
+ final Map<String, Port> outputPortsByVersionedId =
componentsById(group, ProcessGroup::getOutputPorts);
+ final Map<String, Label> labelsByVersionedId =
componentsById(group, ProcessGroup::getLabels, Label::getIdentifier,
Label::getVersionedComponentId);
+ final Map<String, RemoteProcessGroup> rpgsByVersionedId =
componentsById(group, ProcessGroup::getRemoteProcessGroups,
+ RemoteProcessGroup::getIdentifier,
RemoteProcessGroup::getVersionedComponentId);
+ final Map<String, ProcessGroup> childGroupsByVersionedId =
componentsById(group, ProcessGroup::getProcessGroups,
ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+ removeMissingProcessors(group, proposed,
processorsByVersionedId);
+ removeMissingFunnels(group, proposed, funnelsByVersionedId);
+ removeMissingInputPorts(group, proposed,
inputPortsByVersionedId);
+ removeMissingOutputPorts(group, proposed,
outputPortsByVersionedId);
+ removeMissingLabels(group, proposed, labelsByVersionedId);
+ removeMissingRpg(group, proposed, rpgsByVersionedId);
+ removeMissingChildGroups(group, proposed,
childGroupsByVersionedId);
+
+ // Synchronize Child Process Groups
+ synchronizeChildGroups(group, proposed,
versionedParameterContexts, childGroupsByVersionedId,
parameterProviderReferences, topLevelGroup);
+
+ synchronizeFunnels(group, proposed, funnelsByVersionedId);
+ synchronizeInputPorts(group, proposed, proposedPortFinalNames,
inputPortsByVersionedId);
+ synchronizeOutputPorts(group, proposed,
proposedPortFinalNames, outputPortsByVersionedId);
+ synchronizeLabels(group, proposed, labelsByVersionedId);
+ synchronizeProcessors(group, proposed,
processorsByVersionedId, topLevelGroup);
+ synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+ } finally {
+ // Make sure that we reset the connections
+ restoreConnectionDestinations(group, proposed,
connectionsByVersionedId, connectionsWithTempDestination);
+ }
+
+ Map<String, Parameter> newParameters = new HashMap<>();
+ if (!proposedParameterContextExistsBeforeSynchronize &&
this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId())
{
+ Map<String, String> controllerServiceVersionedIdToId =
group.getControllerServices(false)
+ .stream()
+ .filter(controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().isPresent())
+ .collect(Collectors.toMap(
+ controllerServiceNode ->
controllerServiceNode.getVersionedComponentId().get(),
+ ComponentNode::getIdentifier
+ ));
+
+ ParameterContext parameterContext =
group.getParameterContext();
+
+ if (parameterContext != null) {
+ parameterContext.getParameters().forEach((descriptor,
parameter) -> {
+ List<ParameterReferencedControllerServiceData>
referencedControllerServiceData = parameterContext
+ .getParameterReferenceManager()
+
.getReferencedControllerServiceData(parameterContext, descriptor.getName());
+
+ if (referencedControllerServiceData.isEmpty()) {
+ newParameters.put(descriptor.getName(), parameter);
+ } else {
+ final Parameter adjustedParameter = new
Parameter(parameter.getDescriptor(),
controllerServiceVersionedIdToId.get(parameter.getValue()));
+ newParameters.put(descriptor.getName(),
adjustedParameter);
+ }
+ });
- parameterContext.setParameters(newParameters);
+ parameterContext.setParameters(newParameters);
+ }
}
- }
- // We can now add in any necessary connections, since all connectable
components have now been created.
- synchronizeConnections(group, proposed, connectionsByVersionedId);
+ // We can now add in any necessary connections, since all
connectable components have now been created.
+ synchronizeConnections(group, proposed, connectionsByVersionedId);
- // All ports have now been added/removed as necessary. We can now
resolve the port names.
- updatePortsToFinalNames(proposedPortFinalNames);
+ // All ports have now been added/removed as necessary. We can now
resolve the port names.
+ updatePortsToFinalNames(proposedPortFinalNames);
- // Start all components that are queued up to be started now
- context.getComponentScheduler().resume();
+ // Start all components that are queued up to be started now
+ context.getComponentScheduler().resume();
+ } finally {
+ // If we created a temporary funnel, remove it if there's no
longer anything pointing to it.
+ removeTemporaryFunnel(group);
Review Comment:
LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]