http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 7bab76d..bdd328c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -28,7 +28,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.ClassUtils; @@ -44,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.nar.ExtensionManager; @@ -59,15 +59,16 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.PortType; import org.apache.nifi.registry.flow.Position; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; import org.apache.nifi.registry.flow.VersionedPort; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; @@ -80,56 +81,16 @@ public class NiFiRegistryFlowMapper { // created before attempting to create the connection, where the ConnectableDTO is converted. private Map<String, String> versionedComponentIds = new HashMap<>(); - public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) { + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, + final boolean mapDescendantVersionedFlows) { versionedComponentIds.clear(); - final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows); + final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows); - populateReferencedAncestorServices(group, mapped); populateReferencedAncestorVariables(group, mapped); return mapped; } - private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { - final Set<ControllerServiceNode> ancestorControllerServices = group.getControllerServices(true); - ancestorControllerServices.remove(group.getControllerServices(false)); - final Map<String, ControllerServiceNode> ancestorServicesById = ancestorControllerServices.stream() - .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity())); - - final Set<ControllerServiceNode> referenced = new HashSet<>(); - - for (final ProcessorNode processor : group.findAllProcessors()) { - findReferencedServices(processor, ancestorServicesById, referenced); - } - - for (final ControllerServiceNode service : group.findAllControllerServices()) { - findReferencedServices(service, ancestorServicesById, referenced); - } - - final Set<VersionedControllerService> versionedServices = referenced.stream().map(this::mapControllerService) - .collect(Collectors.toCollection(LinkedHashSet::new)); - - versionedGroup.getControllerServices().addAll(versionedServices); - } - - private Set<ControllerServiceNode> findReferencedServices(final ConfiguredComponent component, final Map<String, ControllerServiceNode> ancestorServicesById, - final Set<ControllerServiceNode> referenced) { - - for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null) { - final String serviceId = entry.getValue(); - final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId); - if (serviceNode != null) { - referenced.add(serviceNode); - referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced)); - } - } - } - - return referenced; - } - private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { final Set<String> ancestorVariableNames = new HashSet<>(); populateVariableNames(group.getParent(), ancestorVariableNames); @@ -167,7 +128,9 @@ public class NiFiRegistryFlowMapper { } - private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) { + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceLookup, final FlowRegistryClient registryClient, + final boolean topLevel, final boolean mapDescendantVersionedFlows) { + final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); @@ -212,7 +175,7 @@ public class NiFiRegistryFlowMapper { } versionedGroup.setControllerServices(group.getControllerServices(false).stream() - .map(this::mapControllerService) + .map(service -> mapControllerService(service, serviceLookup)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setFunnels(group.getFunnels().stream() @@ -232,7 +195,7 @@ public class NiFiRegistryFlowMapper { .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setProcessors(group.getProcessors().stream() - .map(this::mapProcessor) + .map(processor -> mapProcessor(processor, serviceLookup)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() @@ -240,7 +203,7 @@ public class NiFiRegistryFlowMapper { .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setProcessGroups(group.getProcessGroups().stream() - .map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows)) + .map(grp -> mapGroup(grp, serviceLookup, registryClient, false, mapDescendantVersionedFlows)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setConnections(group.getConnections().stream() @@ -335,7 +298,7 @@ public class NiFiRegistryFlowMapper { return component; } - public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService) { + public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService, final ControllerServiceProvider serviceProvider) { final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier()); versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier())); versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier())); @@ -345,14 +308,16 @@ public class NiFiRegistryFlowMapper { versionedService.setComments(controllerService.getComments()); versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService)); - versionedService.setProperties(mapProperties(controllerService)); + versionedService.setProperties(mapProperties(controllerService, serviceProvider)); + versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService)); versionedService.setType(controllerService.getCanonicalClassName()); return versionedService; } - private Map<String, String> mapProperties(final ConfiguredComponent component) { + private Map<String, String> mapProperties(final ConfiguredComponent component, final ControllerServiceProvider serviceProvider) { final Map<String, String> mapped = new HashMap<>(); + component.getProperties().keySet().stream() .filter(property -> !property.isSensitive()) .forEach(property -> { @@ -360,11 +325,34 @@ public class NiFiRegistryFlowMapper { if (value == null) { value = property.getDefaultValue(); } + + if (value != null && property.getControllerServiceDefinition() != null) { + // Property references a Controller Service. Instead of storing the existing value, we want + // to store the Versioned Component ID of the service. + final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(value); + if (controllerService != null) { + value = getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier()); + } + } + mapped.put(property.getName(), value); }); + return mapped; } + private Map<String, VersionedPropertyDescriptor> mapPropertyDescriptors(final ConfiguredComponent component) { + final Map<String, VersionedPropertyDescriptor> descriptors = new HashMap<>(); + for (final PropertyDescriptor descriptor : component.getProperties().keySet()) { + final VersionedPropertyDescriptor versionedDescriptor = new VersionedPropertyDescriptor(); + versionedDescriptor.setName(descriptor.getName()); + versionedDescriptor.setDisplayName(descriptor.getDisplayName()); + versionedDescriptor.setIdentifiesControllerService(descriptor.getControllerServiceDefinition() != null); + descriptors.put(descriptor.getName(), versionedDescriptor); + } + return descriptors; + } + private Bundle mapBundle(final BundleCoordinate coordinate) { final Bundle versionedBundle = new Bundle(); versionedBundle.setGroup(coordinate.getGroup()); @@ -441,7 +429,7 @@ public class NiFiRegistryFlowMapper { return position; } - public VersionedProcessor mapProcessor(final ProcessorNode procNode) { + public VersionedProcessor mapProcessor(final ProcessorNode procNode, final ControllerServiceProvider serviceProvider) { final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier()); processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier())); processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier())); @@ -456,7 +444,8 @@ public class NiFiRegistryFlowMapper { processor.setName(procNode.getName()); processor.setPenaltyDuration(procNode.getPenalizationPeriod()); processor.setPosition(mapPosition(procNode.getPosition())); - processor.setProperties(mapProperties(procNode)); + processor.setProperties(mapProperties(procNode, serviceProvider)); + processor.setPropertyDescriptors(mapPropertyDescriptors(procNode)); processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS)); processor.setSchedulingPeriod(procNode.getSchedulingPeriod()); processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 67710fe..ef05a1b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -178,7 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { }; final Runnable checkAuthorizations = new InitializationTask(); - backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id); + backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); backgroundThreadExecutor.submit(() -> { try { @@ -435,11 +435,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * and started. * * @param ports the new ports + * @param pruneUnusedPorts if true, any ports that are not included in the given set of ports + * and that do not have any incoming connections will be removed. * * @throws NullPointerException if the given argument is null */ @Override - public void setInputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) { + public void setInputPorts(final Set<RemoteProcessGroupPortDescriptor> ports, final boolean pruneUnusedPorts) { writeLock.lock(); try { final List<String> newPortTargetIds = new ArrayList<>(); @@ -478,16 +480,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator<StandardRemoteGroupPort> itr = inputPorts.values().iterator(); - while (itr.hasNext()) { - final StandardRemoteGroupPort port = itr.next(); - if (!newPortTargetIds.contains(port.getTargetIdentifier())) { - port.setTargetExists(false); - port.setTargetRunning(false); - - // If port has incoming connection, it will be cleaned up when the connection is removed - if (!port.hasIncomingConnection()) { - itr.remove(); + if (pruneUnusedPorts) { + final Iterator<StandardRemoteGroupPort> itr = inputPorts.values().iterator(); + while (itr.hasNext()) { + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { + port.setTargetExists(false); + port.setTargetRunning(false); + + // If port has incoming connection, it will be cleaned up when the connection is removed + if (!port.hasIncomingConnection()) { + itr.remove(); + } } } } @@ -521,11 +525,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * and started. * * @param ports the new ports + * @param pruneUnusedPorts if true, will remove any ports that are not in the given list and that have + * no outgoing connections * * @throws NullPointerException if the given argument is null */ @Override - public void setOutputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) { + public void setOutputPorts(final Set<RemoteProcessGroupPortDescriptor> ports, final boolean pruneUnusedPorts) { writeLock.lock(); try { final List<String> newPortTargetIds = new ArrayList<>(); @@ -535,7 +541,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final Map<String, StandardRemoteGroupPort> outputPortByTargetId = outputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); - final Map<String, StandardRemoteGroupPort> outputPortByName = inputPorts.values().stream() + final Map<String, StandardRemoteGroupPort> outputPortByName = outputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity())); // Check if we have a matching port already and add the port if not. We determine a matching port @@ -564,16 +570,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator<StandardRemoteGroupPort> itr = outputPorts.values().iterator(); - while (itr.hasNext()) { - final StandardRemoteGroupPort port = itr.next(); - if (!newPortTargetIds.contains(port.getTargetIdentifier())) { - port.setTargetExists(false); - port.setTargetRunning(false); - - // If port has connections, it will be cleaned up when connections are removed - if (port.getConnections().isEmpty()) { - itr.remove(); + if (pruneUnusedPorts) { + final Iterator<StandardRemoteGroupPort> itr = outputPorts.values().iterator(); + while (itr.hasNext()) { + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { + port.setTargetExists(false); + port.setTargetRunning(false); + + // If port has connections, it will be cleaned up when connections are removed + if (port.getConnections().isEmpty()) { + itr.remove(); + } } } } @@ -617,53 +625,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } - @Override - public void removeAllNonExistentPorts() { - writeLock.lock(); - try { - final Set<String> inputPortIds = new HashSet<>(); - final Set<String> outputPortIds = new HashSet<>(); - - for (final Map.Entry<String, StandardRemoteGroupPort> entry : inputPorts.entrySet()) { - final RemoteGroupPort port = entry.getValue(); - - if (port.getTargetExists()) { - continue; - } - - // If there's a connection, we don't remove it. - if (port.hasIncomingConnection()) { - continue; - } - - inputPortIds.add(entry.getKey()); - } - - for (final Map.Entry<String, StandardRemoteGroupPort> entry : outputPorts.entrySet()) { - final RemoteGroupPort port = entry.getValue(); - - if (port.getTargetExists()) { - continue; - } - - // If there's a connection, we don't remove it. - if (!port.getConnections().isEmpty()) { - continue; - } - - outputPortIds.add(entry.getKey()); - } - - for (final String id : inputPortIds) { - inputPorts.remove(id); - } - for (final String id : outputPortIds) { - outputPorts.remove(id); - } - } finally { - writeLock.unlock(); - } - } /** * Adds an Output Port to this Remote Process Group that is described by @@ -865,35 +826,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { dto = apiClient.getController(targetUris); } catch (IOException e) { - writeLock.lock(); - try { - for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) { - final StandardRemoteGroupPort inputPort = iter.next(); - if (!inputPort.hasIncomingConnection()) { - iter.remove(); - } - } - - for (final Iterator<StandardRemoteGroupPort> iter = outputPorts.values().iterator(); iter.hasNext();) { - final StandardRemoteGroupPort outputPort = iter.next(); - if (outputPort.getConnections().isEmpty()) { - iter.remove(); - } - } - } finally { - writeLock.unlock(); - } - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage()); } writeLock.lock(); try { if (dto.getInputPorts() != null) { - setInputPorts(convertRemotePort(dto.getInputPorts())); + setInputPorts(convertRemotePort(dto.getInputPorts()), true); } if (dto.getOutputPorts() != null) { - setOutputPorts(convertRemotePort(dto.getOutputPorts())); + setOutputPorts(convertRemotePort(dto.getOutputPorts()), true); } // set the controller details http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index ef69906..d006cff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -347,7 +347,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public ControllerServiceNode findControllerService(final String id) { + public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) { return serviceMap.get(id); } @@ -678,4 +678,8 @@ public class MockProcessGroup implements ProcessGroup { @Override public void verifyCanShowLocalModifications() { } + + @Override + public void onComponentModified() { + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 98c7bc8..4945296 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3731,7 +3731,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); + final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); return versionedGroup; } @@ -3753,21 +3753,39 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser()); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); + final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup); + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); - final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); + final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, + diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED); + final FlowComparisonEntity entity = new FlowComparisonEntity(); entity.setComponentDifferences(differenceDtos); return entity; } + private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) { + final Set<String> ancestorServiceIds; + ProcessGroup parentGroup = group.getParent(); + + if (parentGroup == null) { + ancestorServiceIds = Collections.emptySet(); + } else { + ancestorServiceIds = parentGroup.getControllerServices(true).stream() + .map(ControllerServiceNode::getIdentifier) + .collect(Collectors.toSet()); + } + + return ancestorServiceIds; + } + @Override public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); @@ -3852,12 +3870,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true); + final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents); final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor()); + final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream() http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 3fa4462..7b753d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1717,7 +1717,7 @@ public class ProcessGroupResource extends ApplicationResource { // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position. flowSnapshot.getFlowContents().setPosition(null); entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, - versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); + versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); } populateRemainingProcessGroupEntityContent(entity); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 5b33d90..8e5974a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -117,6 +117,8 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; @@ -213,6 +215,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -2187,17 +2190,23 @@ public final class DtoFactory { public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) { + return createComponentDifferenceDtos(comparison, null); + } + + public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final Predicate<FlowDifference> filter) { final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>(); for (final FlowDifference difference : comparison.getDifferences()) { - final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); - final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); + if (filter == null || filter.test(difference)) { + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); + final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); - final DifferenceDTO dto = new DifferenceDTO(); - dto.setDifferenceType(difference.getDifferenceType().getDescription()); - dto.setDifference(difference.getDescription()); + final DifferenceDTO dto = new DifferenceDTO(); + dto.setDifferenceType(difference.getDifferenceType().getDescription()); + dto.setDifference(difference.getDescription()); - differences.add(dto); + differences.add(dto); + } } for (final Map.Entry<ComponentDifferenceDTO, List<DifferenceDTO>> entry : differencesByComponent.entrySet()) { @@ -2252,6 +2261,12 @@ public final class DtoFactory { dto.setVersion(versionControlInfo.getVersion()); dto.setCurrent(versionControlInfo.isCurrent()); dto.setModified(versionControlInfo.isModified()); + + final VersionedFlowStatus status = versionControlInfo.getStatus(); + final VersionedFlowState state = status.getState(); + dto.setState(state == null ? null : state.name()); + dto.setStateExplanation(status.getStateExplanation()); + return dto; } @@ -3488,6 +3503,8 @@ public final class DtoFactory { copy.setVersion(original.getVersion()); copy.setCurrent(original.getCurrent()); copy.setModified(original.getModified()); + copy.setState(original.getState()); + copy.setStateExplanation(original.getStateExplanation()); return copy; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 29e5f7d..907c8dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -52,6 +52,7 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.ContentNotFoundException; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -172,6 +173,10 @@ public class ControllerFacade implements Authorizable { } } + public ControllerServiceProvider getControllerServiceProvider() { + return flowController; + } + /** * Sets the name of this controller. * http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 0f9ec7a..4d8e984 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.dao.impl; +import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS; + import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.state.Scope; @@ -45,8 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS; - public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO { private ControllerServiceProvider serviceProvider; @@ -172,6 +172,22 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } } + controllerService.getProcessGroup().onComponentModified(); + + // For any component that references this Controller Service, find the component's Process Group + // and notify the Process Group that a component has been modified. This way, we know to re-calculate + // whether or not the Process Group has local modifications. + final ProcessGroup group = controllerService.getProcessGroup(); + controllerService.getReferences().getReferencingComponents().stream() + .map(ConfiguredComponent::getProcessGroupIdentifier) + .filter(id -> !id.equals(group.getIdentifier())) + .forEach(groupId -> { + final ProcessGroup descendant = group.findProcessGroup(groupId); + if (descendant != null) { + descendant.onComponentModified(); + } + }); + return controllerService; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java index e4ec239..60426c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java @@ -91,6 +91,8 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO { } } + funnel.getProcessGroup().onComponentModified(); + return funnel; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java index f830e9b..2d47720 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java @@ -237,6 +237,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO { inputPort.setMaxConcurrentTasks(concurrentTasks); } + inputPort.getProcessGroup().onComponentModified(); return inputPort; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java index 2a8b19f..b8105e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java @@ -102,6 +102,7 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO { label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); } + label.getProcessGroup().onComponentModified(); return label; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java index bad9e3a..72bc49b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java @@ -233,6 +233,7 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO { outputPort.setMaxConcurrentTasks(concurrentTasks); } + outputPort.getProcessGroup().onComponentModified(); return outputPort; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index bb7edb1..1aaf4cc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -204,7 +204,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou CompletableFuture<Void> future = CompletableFuture.completedFuture(null); for (final String serviceId : serviceIds) { - final ControllerServiceNode serviceNode = group.findControllerService(serviceId); + final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true); + if (serviceNode == null) { + throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId); + } + if (ControllerServiceState.ENABLED.equals(state)) { final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode); future = CompletableFuture.allOf(future, serviceFuture); @@ -234,6 +238,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou group.setComments(comments); } + group.onComponentModified(); return group; } @@ -247,7 +252,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false); + final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController, flowController.getFlowRegistryClient(), false); final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .registryName(registryName) @@ -257,6 +262,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .build(); group.setVersionControlInformation(vci, versionedComponentMapping); + group.onComponentModified(); return group; } @@ -279,6 +285,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .build(); group.setVersionControlInformation(svci, Collections.emptyMap()); + group.onComponentModified(); + return group; } @@ -295,6 +303,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .forEach(var -> variableMap.put(var.getName(), var.getValue())); group.setVariables(variableMap); + group.onComponentModified(); return group; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index ffbe21c..95d0b54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -411,6 +411,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { // configure the processor configureProcessor(processor, processorDTO); + parentGroup.onComponentModified(); // attempt to change the underlying processor if an updated bundle is specified // updating the bundle must happen after configuring so that any additional classpath resources are set first http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index d638839..c570dfc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -314,6 +314,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // perform the update updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + remoteProcessGroup.getProcessGroup().onComponentModified(); return port; } @@ -332,6 +333,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // perform the update updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + remoteProcessGroup.getProcessGroup().onComponentModified(); return port; } @@ -373,8 +375,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) { RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId()); return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO); - - } private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) { @@ -447,6 +447,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot } } + remoteProcessGroup.getProcessGroup().onComponentModified(); return remoteProcessGroup; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java index 7fdaf56..a801dcb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -26,6 +26,7 @@ import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; @@ -38,6 +39,9 @@ public class AffectedComponentUtils { case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(procEntity); + case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE: + final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(serviceEntity); case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: { final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user); final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents(); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 3961be7..28fef0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -107,7 +107,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final int scheduleComponentStatus = clusterResponse.getStatus(); if (scheduleComponentStatus != Status.OK.getStatusCode()) { - throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState); + final String explanation = getResponseEntity(clusterResponse, String.class); + throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState + " due to " + explanation); } final boolean processorsTransitioned = waitForProcessorStatus(user, exampleUri, groupId, componentMap, desiredState, pause); @@ -312,7 +313,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final int disableServicesStatus = clusterResponse.getStatus(); if (disableServicesStatus != Status.OK.getStatusCode()) { - throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState); + final String explanation = getResponseEntity(clusterResponse, String.class); + throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation); } final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause);
