Repository: nifi Updated Branches: refs/heads/master d64fe416b -> ce8a0de36
NIFI-1994: Fixed issue with Controller Service Fully Qualified Class Names and ensure that services are added to the process groups as appropriate when instantiating templates NIFI-1882: Ensuring Controller Services are copied as part of a ProcessGroupDTO. This closes #517 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ce8a0de3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ce8a0de3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ce8a0de3 Branch: refs/heads/master Commit: ce8a0de368f6fe9bdbf746879717c1741d8114b6 Parents: d64fe41 Author: Mark Payne <[email protected]> Authored: Thu Jun 9 20:39:29 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Mon Jun 20 22:24:25 2016 -0400 ---------------------------------------------------------------------- .../node/NodeClusterCoordinator.java | 2 +- .../manager/ControllerServiceEntityMerger.java | 22 +- .../apache/nifi/controller/FlowController.java | 3 + .../nifi/web/api/ApplicationResource.java | 44 ++-- .../apache/nifi/web/api/ControllerResource.java | 50 ++--- .../org/apache/nifi/web/api/dto/DtoFactory.java | 14 +- .../org/apache/nifi/web/util/SnippetUtils.java | 202 ++++++++----------- 7 files changed, 160 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 88853e3..a90be64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -278,7 +278,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) { - eventReporter.reportEvent(severity, EVENT_CATEGORY, "Event Reported for " + nodeId.toString() + " -- " + event); + eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event); if (nodeId != null) { addNodeEvent(nodeId, severity, event); } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java index e9e542e..8c9f0c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java @@ -94,8 +94,8 @@ public class ControllerServiceEntityMerger { clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); } - public static void mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity> referencingComponents, - Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) { + public static void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentEntity> referencingComponents, + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) { final Map<String, Integer> activeThreadCounts = new HashMap<>(); final Map<String, String> states = new HashMap<>(); @@ -131,15 +131,17 @@ public class ControllerServiceEntityMerger { } // go through each referencing components - for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) { - final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); - if (activeThreadCount != null) { - referencingComponent.getComponent().setActiveThreadCount(activeThreadCount); - } + if (referencingComponents != null) { + for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) { + final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); + if (activeThreadCount != null) { + referencingComponent.getComponent().setActiveThreadCount(activeThreadCount); + } - final String state = states.get(referencingComponent.getId()); - if (state != null) { - referencingComponent.getComponent().setState(state); + final String state = states.get(referencingComponent.getId()); + if (state != null) { + referencingComponent.getComponent().setState(state); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8b16ab3..012a5dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1523,6 +1523,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); serviceNode.setName(controllerServiceDTO.getName()); + + group.addControllerService(serviceNode); } // configure controller services. We do this after creating all of them in case 1 service @@ -1717,6 +1719,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childTemplateDTO.setProcessors(contents.getProcessors()); childTemplateDTO.setFunnels(contents.getFunnels()); childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); + childTemplateDTO.setControllerServices(contents.getControllerServices()); instantiateSnippet(childGroup, childTemplateDTO); } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index f11cde9..cb11f90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -112,16 +112,16 @@ public abstract class ApplicationResource { * @param path path * @return resource uri */ - protected String generateResourceUri(String... path) { - UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); + protected String generateResourceUri(final String... path) { + final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); uriBuilder.segment(path); URI uri = uriBuilder.build(); try { // check for proxy settings - String scheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER); - String host = httpServletRequest.getHeader(PROXY_HOST_HTTP_HEADER); - String port = httpServletRequest.getHeader(PROXY_PORT_HTTP_HEADER); + final String scheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER); + final String host = httpServletRequest.getHeader(PROXY_HOST_HTTP_HEADER); + final String port = httpServletRequest.getHeader(PROXY_PORT_HTTP_HEADER); String baseContextPath = httpServletRequest.getHeader(PROXY_CONTEXT_PATH_HTTP_HEADER); // if necessary, prepend the context path @@ -144,7 +144,7 @@ public abstract class ApplicationResource { } else { try { uriPort = Integer.parseInt(port); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { logger.warn(String.format("Unable to parse proxy port HTTP header '%s'. Using port from request URI '%s'.", port, uriPort)); } } @@ -172,8 +172,8 @@ public abstract class ApplicationResource { * @param response response * @return builder */ - protected ResponseBuilder noCache(ResponseBuilder response) { - CacheControl cacheControl = new CacheControl(); + protected ResponseBuilder noCache(final ResponseBuilder response) { + final CacheControl cacheControl = new CacheControl(); cacheControl.setPrivate(true); cacheControl.setNoCache(true); cacheControl.setNoStore(true); @@ -186,7 +186,7 @@ public abstract class ApplicationResource { * @param response response * @return builder */ - protected ResponseBuilder clusterContext(ResponseBuilder response) { + protected ResponseBuilder clusterContext(final ResponseBuilder response) { // TODO: Remove this method. Since ClusterContext was removed, it is no longer needed. However, // it is called by practically every endpoint so for now it is just being stubbed out. return response; @@ -222,8 +222,8 @@ public abstract class ApplicationResource { * @param entity The entity * @return The response to be built */ - protected ResponseBuilder generateOkResponse(Object entity) { - ResponseBuilder response = Response.ok(entity); + protected ResponseBuilder generateOkResponse(final Object entity) { + final ResponseBuilder response = Response.ok(entity); return noCache(response); } @@ -234,7 +234,7 @@ public abstract class ApplicationResource { * @param entity entity * @return The response to be built */ - protected ResponseBuilder generateCreatedResponse(URI uri, Object entity) { + protected ResponseBuilder generateCreatedResponse(final URI uri, final Object entity) { // generate the response builder return Response.created(uri).entity(entity); } @@ -268,7 +268,7 @@ public abstract class ApplicationResource { // get the form that jersey processed and use it if it exists (only exist for requests with a body and application form urlencoded final Form form = (Form) httpContext.getProperties().get(FormDispatchProvider.FORM_PROPERTY); if (form == null) { - for (Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) { + for (final Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) { if (entry.getValue() == null) { entity.add(entry.getKey(), null); } else { @@ -319,7 +319,7 @@ public abstract class ApplicationResource { } // set the proxy scheme to request scheme if not already set client - String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER); + final String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER); if (proxyScheme == null) { result.put(PROXY_SCHEME_HTTP_HEADER, httpServletRequest.getScheme()); } @@ -351,7 +351,7 @@ public abstract class ApplicationResource { * @param httpServletRequest the request * @return <code>true</code> if the request represents a two-phase commit style request */ - protected boolean isTwoPhaseRequest(HttpServletRequest httpServletRequest) { + protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) { final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); return headerValue != null; } @@ -366,11 +366,11 @@ public abstract class ApplicationResource { * @return <code>true</code> if the request represents a two-phase commit style request and is the * first of the two phases. */ - protected boolean isValidationPhase(HttpServletRequest httpServletRequest) { + protected boolean isValidationPhase(final HttpServletRequest httpServletRequest) { return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null; } - protected boolean isClaimCancelationPhase(HttpServletRequest httpServletRequest) { + protected boolean isClaimCancelationPhase(final HttpServletRequest httpServletRequest) { return httpServletRequest.getHeader(RequestReplicator.CLAIM_CANCEL_HEADER) != null; } @@ -402,7 +402,7 @@ public abstract class ApplicationResource { * @param componentId the ID of the component that the Revision DTO belongs to * @return a Revision that has the same client ID and Version as the Revision DTO and the Component ID specified */ - protected Revision getRevision(RevisionDTO revisionDto, String componentId) { + protected Revision getRevision(final RevisionDTO revisionDto, final String componentId) { return new Revision(revisionDto.getVersion(), revisionDto.getClientId(), componentId); } @@ -412,7 +412,7 @@ public abstract class ApplicationResource { * @param entity the ComponentEntity that contains the Revision DTO & ID * @return the Revision specified in the ComponentEntity */ - protected Revision getRevision(ComponentEntity entity, String componentId) { + protected Revision getRevision(final ComponentEntity entity, final String componentId) { return getRevision(entity.getRevision(), componentId); } @@ -650,7 +650,7 @@ public abstract class ApplicationResource { return clusterCoordinator != null && clusterCoordinator.isConnected(); } - public void setRequestReplicator(RequestReplicator requestReplicator) { + public void setRequestReplicator(final RequestReplicator requestReplicator) { this.requestReplicator = requestReplicator; } @@ -658,11 +658,11 @@ public abstract class ApplicationResource { return requestReplicator; } - public void setProperties(NiFiProperties properties) { + public void setProperties(final NiFiProperties properties) { this.properties = properties; } - public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) { + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index ecd4626..4f75347 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -16,13 +16,25 @@ */ package org.apache.nifi.web.api; -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Collections; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AuthorizationRequest; @@ -50,23 +62,13 @@ import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.Collections; -import java.util.Set; +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Flow Controller. http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index c99f98c..382a8c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1096,7 +1096,7 @@ public final class DtoFactory { final ReportingTaskDTO dto = new ReportingTaskDTO(); dto.setId(reportingTaskNode.getIdentifier()); dto.setName(reportingTaskNode.getName()); - dto.setType(reportingTaskNode.getComponentType()); + dto.setType(reportingTaskNode.getCanonicalClassName()); dto.setSchedulingStrategy(reportingTaskNode.getSchedulingStrategy().name()); dto.setSchedulingPeriod(reportingTaskNode.getSchedulingPeriod()); dto.setState(reportingTaskNode.getScheduledState().name()); @@ -1168,7 +1168,7 @@ public final class DtoFactory { dto.setId(controllerServiceNode.getIdentifier()); dto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier()); dto.setName(controllerServiceNode.getName()); - dto.setType(controllerServiceNode.getComponentType()); + dto.setType(controllerServiceNode.getCanonicalClassName()); dto.setState(controllerServiceNode.getState().name()); dto.setAnnotationData(controllerServiceNode.getAnnotationData()); dto.setComments(controllerServiceNode.getComments()); @@ -2404,6 +2404,7 @@ public final class DtoFactory { copy.setCustomUiUrl(original.getCustomUiUrl()); copy.setDescriptors(copy(original.getDescriptors())); copy.setId(original.getId()); + copy.setParentGroupId(original.getParentGroupId()); copy.setName(original.getName()); copy.setProperties(copy(original.getProperties())); copy.setReferencingComponents(copy(original.getReferencingComponents())); @@ -2683,6 +2684,7 @@ public final class DtoFactory { final Set<ProcessorDTO> processors = new LinkedHashSet<>(); final Set<RemoteProcessGroupDTO> remoteProcessGroups = new LinkedHashSet<>(); final Set<FunnelDTO> funnels = new LinkedHashSet<>(); + final Set<ControllerServiceDTO> controllerServices = new LinkedHashSet<>(); if (deep) { for (final ProcessGroupDTO group : original.getProcessGroups()) { @@ -2716,6 +2718,10 @@ public final class DtoFactory { for (final ConnectionDTO connection : original.getConnections()) { connections.add(copy(connection)); } + + for (final ControllerServiceDTO controllerService : original.getControllerServices()) { + controllerServices.add(copy(controllerService)); + } } else { if (original.getConnections() != null) { connections.addAll(copy(original.getConnections())); @@ -2741,6 +2747,9 @@ public final class DtoFactory { if (original.getFunnels() != null) { funnels.addAll(copy(original.getFunnels())); } + if (original.getControllerServices() != null) { + controllerServices.addAll(copy(original.getControllerServices())); + } } copy.setConnections(connections); @@ -2751,6 +2760,7 @@ public final class DtoFactory { copy.setProcessors(processors); copy.setRemoteProcessGroups(remoteProcessGroups); copy.setFunnels(funnels); + copy.setControllerServices(controllerServices); return copy; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce8a0de3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index e29089f..fc3a851 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -18,7 +18,6 @@ package org.apache.nifi.web.util; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -26,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.ConnectableType; @@ -73,7 +73,7 @@ public final class SnippetUtils { * @param includeControllerServices whether or not to include controller services in the flow snippet dto * @return snippet */ - public FlowSnippetDTO populateFlowSnippet(Snippet snippet, boolean recurse, boolean includeControllerServices) { + public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices) { final FlowSnippetDTO snippetDto = new FlowSnippetDTO(); final String groupId = snippet.getParentGroupId(); final ProcessGroup processGroup = flowController.getGroup(groupId); @@ -83,15 +83,21 @@ public final class SnippetUtils { throw new IllegalStateException("The parent process group for this snippet could not be found."); } + final Set<ControllerServiceDTO> controllerServices = new HashSet<>(); + // add any processors if (!snippet.getProcessors().isEmpty()) { final Set<ProcessorDTO> processors = new LinkedHashSet<>(); - for (String processorId : snippet.getProcessors().keySet()) { + for (final String processorId : snippet.getProcessors().keySet()) { final ProcessorNode processor = processGroup.getProcessor(processorId); if (processor == null) { throw new IllegalStateException("A processor in this snippet could not be found."); } processors.add(dtoFactory.createProcessorDto(processor)); + + if (includeControllerServices) { + controllerServices.addAll(getControllerServices(processor.getProperties())); + } } snippetDto.setProcessors(processors); } @@ -99,7 +105,7 @@ public final class SnippetUtils { // add any connections if (!snippet.getConnections().isEmpty()) { final Set<ConnectionDTO> connections = new LinkedHashSet<>(); - for (String connectionId : snippet.getConnections().keySet()) { + for (final String connectionId : snippet.getConnections().keySet()) { final Connection connection = processGroup.getConnection(connectionId); if (connection == null) { throw new IllegalStateException("A connection in this snippet could not be found."); @@ -112,7 +118,7 @@ public final class SnippetUtils { // add any funnels if (!snippet.getFunnels().isEmpty()) { final Set<FunnelDTO> funnels = new LinkedHashSet<>(); - for (String funnelId : snippet.getFunnels().keySet()) { + for (final String funnelId : snippet.getFunnels().keySet()) { final Funnel funnel = processGroup.getFunnel(funnelId); if (funnel == null) { throw new IllegalStateException("A funnel in this snippet could not be found."); @@ -125,7 +131,7 @@ public final class SnippetUtils { // add any input ports if (!snippet.getInputPorts().isEmpty()) { final Set<PortDTO> inputPorts = new LinkedHashSet<>(); - for (String inputPortId : snippet.getInputPorts().keySet()) { + for (final String inputPortId : snippet.getInputPorts().keySet()) { final Port inputPort = processGroup.getInputPort(inputPortId); if (inputPort == null) { throw new IllegalStateException("An input port in this snippet could not be found."); @@ -138,7 +144,7 @@ public final class SnippetUtils { // add any labels if (!snippet.getLabels().isEmpty()) { final Set<LabelDTO> labels = new LinkedHashSet<>(); - for (String labelId : snippet.getLabels().keySet()) { + for (final String labelId : snippet.getLabels().keySet()) { final Label label = processGroup.getLabel(labelId); if (label == null) { throw new IllegalStateException("A label in this snippet could not be found."); @@ -151,7 +157,7 @@ public final class SnippetUtils { // add any output ports if (!snippet.getOutputPorts().isEmpty()) { final Set<PortDTO> outputPorts = new LinkedHashSet<>(); - for (String outputPortId : snippet.getOutputPorts().keySet()) { + for (final String outputPortId : snippet.getOutputPorts().keySet()) { final Port outputPort = processGroup.getOutputPort(outputPortId); if (outputPort == null) { throw new IllegalStateException("An output port in this snippet could not be found."); @@ -164,12 +170,16 @@ public final class SnippetUtils { // add any process groups if (!snippet.getProcessGroups().isEmpty()) { final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>(); - for (String childGroupId : snippet.getProcessGroups().keySet()) { + for (final String childGroupId : snippet.getProcessGroups().keySet()) { final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); if (childGroup == null) { throw new IllegalStateException("A process group in this snippet could not be found."); } - processGroups.add(dtoFactory.createProcessGroupDto(childGroup, recurse)); + + final ProcessGroupDTO childGroupDto = dtoFactory.createProcessGroupDto(childGroup, recurse); + processGroups.add(childGroupDto); + + addControllerServices(childGroup, childGroupDto); } snippetDto.setProcessGroups(processGroups); } @@ -177,7 +187,7 @@ public final class SnippetUtils { // add any remote process groups if (!snippet.getRemoteProcessGroups().isEmpty()) { final Set<RemoteProcessGroupDTO> remoteProcessGroups = new LinkedHashSet<>(); - for (String remoteProcessGroupId : snippet.getRemoteProcessGroups().keySet()) { + for (final String remoteProcessGroupId : snippet.getRemoteProcessGroups().keySet()) { final RemoteProcessGroup remoteProcessGroup = processGroup.getRemoteProcessGroup(remoteProcessGroupId); if (remoteProcessGroup == null) { throw new IllegalStateException("A remote process group in this snippet could not be found."); @@ -187,108 +197,63 @@ public final class SnippetUtils { snippetDto.setRemoteProcessGroups(remoteProcessGroups); } - if (includeControllerServices) { - Set<ControllerServiceDTO> controllerServices = snippetDto.getControllerServices(); - if (controllerServices == null) { - controllerServices = new HashSet<>(); - snippetDto.setControllerServices(controllerServices); - } - - addControllerServicesToSnippet(snippetDto, controllerServices); - } + snippetDto.setControllerServices(controllerServices); return snippetDto; } - private void addControllerServicesToSnippet(final FlowSnippetDTO snippetDto, final Set<ControllerServiceDTO> destinationSet) { - final Set<ProcessorDTO> processors = snippetDto.getProcessors(); - if (processors != null) { - for (final ProcessorDTO processorDto : processors) { - addControllerServicesToSnippet(snippetDto, processorDto, destinationSet); - } + private void addControllerServices(final ProcessGroup group, final ProcessGroupDTO dto) { + final FlowSnippetDTO contents = dto.getContents(); + if (contents == null) { + return; } - final Set<ProcessGroupDTO> childGroups = snippetDto.getProcessGroups(); - if (childGroups != null) { - for (final ProcessGroupDTO processGroupDto : childGroups) { - final FlowSnippetDTO childGroupDto = processGroupDto.getContents(); - if (childGroupDto != null) { - addControllerServicesToSnippet(childGroupDto, destinationSet); - } - } - } - } + final Set<ControllerServiceDTO> controllerServices = new HashSet<>(); - private void addControllerServicesToSnippet(final FlowSnippetDTO snippet, final ProcessorDTO processorDto, final Set<ControllerServiceDTO> destinationSet) { - final ProcessorConfigDTO configDto = processorDto.getConfig(); - if (configDto == null) { - return; + for (final ProcessorNode procNode : group.getProcessors()) { + final Set<ControllerServiceDTO> servicesForProcessor = getControllerServices(procNode.getProperties()); + controllerServices.addAll(servicesForProcessor); } - final Map<String, PropertyDescriptorDTO> descriptors = configDto.getDescriptors(); - final Map<String, String> properties = configDto.getProperties(); + contents.setControllerServices(controllerServices); - if (properties != null && descriptors != null) { - for (final Map.Entry<String, String> entry : properties.entrySet()) { - final String propName = entry.getKey(); - final String propValue = entry.getValue(); - if (propValue == null) { - continue; - } + // Map child process group ID to the child process group for easy lookup + final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream() + .collect(Collectors.toMap(childGroupDto -> childGroupDto.getId(), childGroupDto -> childGroupDto)); - final PropertyDescriptorDTO propertyDescriptorDto = descriptors.get(propName); - if (propertyDescriptorDto != null && propertyDescriptorDto.getIdentifiesControllerService() != null) { - final ControllerServiceNode serviceNode = flowController.getControllerServiceNode(propValue); - if (serviceNode != null) { - addControllerServicesToSnippet(snippet, serviceNode, destinationSet); - } - } + for (final ProcessGroup childGroup : group.getProcessGroups()) { + final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier()); + if (childDto == null) { + continue; } - } - } - private void addControllerServicesToSnippet(final FlowSnippetDTO snippet, final ControllerServiceNode serviceNode, final Set<ControllerServiceDTO> destinationSet) { - if (isServicePresent(serviceNode.getIdentifier(), snippet.getControllerServices())) { - return; + addControllerServices(childGroup, childDto); } + } - final ControllerServiceDTO serviceNodeDto = dtoFactory.createControllerServiceDto(serviceNode); - destinationSet.add(serviceNodeDto); + private Set<ControllerServiceDTO> getControllerServices(final Map<PropertyDescriptor, String> componentProperties) { + final Set<ControllerServiceDTO> serviceDtos = new HashSet<>(); - for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) { + for (final Map.Entry<PropertyDescriptor, String> entry : componentProperties.entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); - final String propertyValue = entry.getValue(); - - if (descriptor.getControllerServiceDefinition() != null && propertyValue != null) { - final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue); - if (referencedNode == null) { - throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found"); - } - - final String referencedNodeId = referencedNode.getIdentifier(); + if (descriptor.getControllerServiceDefinition() != null) { + final String controllerServiceId = entry.getValue(); + if (controllerServiceId != null) { + final ControllerServiceNode serviceNode = flowController.getControllerServiceNode(controllerServiceId); + if (serviceNode != null) { + serviceDtos.add(dtoFactory.createControllerServiceDto(serviceNode)); - final boolean alreadyPresent = isServicePresent(referencedNodeId, snippet.getControllerServices()); - if (!alreadyPresent) { - addControllerServicesToSnippet(snippet, referencedNode, destinationSet); + final Set<ControllerServiceDTO> recursiveRefs = getControllerServices(serviceNode.getProperties()); + serviceDtos.addAll(recursiveRefs); + } } } } - } - - private boolean isServicePresent(final String serviceId, final Collection<ControllerServiceDTO> services) { - if (services == null) { - return false; - } - for (final ControllerServiceDTO existingService : services) { - if (serviceId.equals(existingService.getId())) { - return true; - } - } - - return false; + return serviceDtos; } + public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed) { final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed); resolveNameConflicts(snippetCopy, group); @@ -346,7 +311,7 @@ public final class SnippetUtils { } private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap, - String idGenerationSeed) { + final String idGenerationSeed) { final FlowSnippetDTO snippetContentsCopy = new FlowSnippetDTO(); // @@ -354,39 +319,40 @@ public final class SnippetUtils { // if (serviceIdMap == null) { serviceIdMap = new HashMap<>(); - final Set<ControllerServiceDTO> services = new HashSet<>(); - if (snippetContents.getControllerServices() != null) { - for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) { - final ControllerServiceDTO service = dtoFactory.copy(serviceDTO); - service.setId(generateId(serviceDTO.getId(), idGenerationSeed)); - service.setState(ControllerServiceState.DISABLED.name()); - services.add(service); - - // Map old service ID to new service ID so that we can make sure that we reference the new ones. - serviceIdMap.put(serviceDTO.getId(), service.getId()); - } + } + + final Set<ControllerServiceDTO> services = new HashSet<>(); + if (snippetContents.getControllerServices() != null) { + for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) { + final ControllerServiceDTO service = dtoFactory.copy(serviceDTO); + service.setId(generateId(serviceDTO.getId(), idGenerationSeed)); + service.setState(ControllerServiceState.DISABLED.name()); + services.add(service); + + // Map old service ID to new service ID so that we can make sure that we reference the new ones. + serviceIdMap.put(serviceDTO.getId(), service.getId()); } + } - // if there is any controller service that maps to another controller service, update the id's - for (final ControllerServiceDTO serviceDTO : services) { - final Map<String, String> properties = serviceDTO.getProperties(); - final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors(); - if (properties != null && descriptors != null) { - for (final PropertyDescriptorDTO descriptor : descriptors.values()) { - if (descriptor.getIdentifiesControllerService() != null) { - final String currentServiceId = properties.get(descriptor.getName()); - if (currentServiceId == null) { - continue; - } - - final String newServiceId = serviceIdMap.get(currentServiceId); - properties.put(descriptor.getName(), newServiceId); + // if there is any controller service that maps to another controller service, update the id's + for (final ControllerServiceDTO serviceDTO : services) { + final Map<String, String> properties = serviceDTO.getProperties(); + final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors(); + if (properties != null && descriptors != null) { + for (final PropertyDescriptorDTO descriptor : descriptors.values()) { + if (descriptor.getIdentifiesControllerService() != null) { + final String currentServiceId = properties.get(descriptor.getName()); + if (currentServiceId == null) { + continue; } + + final String newServiceId = serviceIdMap.get(currentServiceId); + properties.put(descriptor.getName(), newServiceId); } } } - snippetContentsCopy.setControllerServices(services); } + snippetContentsCopy.setControllerServices(services); // // Copy the labels @@ -601,11 +567,11 @@ public final class SnippetUtils { } /* setters */ - public void setDtoFactory(DtoFactory dtoFactory) { + public void setDtoFactory(final DtoFactory dtoFactory) { this.dtoFactory = dtoFactory; } - public void setFlowController(FlowController flowController) { + public void setFlowController(final FlowController flowController) { this.flowController = flowController; }
