NIFI-1801: Scope Templates to Process Groups. This closes #446.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/270944ec Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/270944ec Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/270944ec Branch: refs/heads/master Commit: 270944ec692e12c221cdff202bdab56309dfcfd7 Parents: 347b281 Author: Mark Payne <[email protected]> Authored: Thu May 12 13:09:36 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Mon May 16 16:12:43 2016 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/nifi/web/Revision.java | 5 - .../apache/nifi/web/api/dto/TemplateDTO.java | 15 +- .../nifi/cluster/protocol/StandardDataFlow.java | 11 +- .../protocol/jaxb/message/AdaptedDataFlow.java | 9 - .../protocol/jaxb/message/DataFlowAdapter.java | 3 +- .../impl/NodeProtocolSenderImplTest.java | 2 +- .../ClusterProtocolHeartbeatMonitor.java | 17 - .../endpoints/FlowSnippetEndpointMerger.java | 44 +- .../http/endpoints/ProcessorEndpointMerger.java | 14 + .../RemoteProcessGroupEndpointMerger.java | 13 + .../http/replication/ResponseUtils.java | 2 +- .../ThreadPoolRequestReplicator.java | 30 +- .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 4 +- .../cluster/manager/impl/WebClusterManager.java | 6 +- .../impl/DataFlowManagementServiceImplTest.java | 14 +- .../apache/nifi/cluster/protocol/DataFlow.java | 5 - .../org/apache/nifi/controller/Template.java | 191 +++++++ .../org/apache/nifi/groups/ProcessGroup.java | 42 ++ .../apache/nifi/controller/FlowController.java | 75 +-- .../nifi/controller/StandardFlowService.java | 100 +++- .../controller/StandardFlowSynchronizer.java | 99 +--- .../org/apache/nifi/controller/Template.java | 37 -- .../apache/nifi/controller/TemplateManager.java | 524 ------------------- .../apache/nifi/controller/TemplateUtils.java | 287 ++++++++++ .../serialization/StandardFlowSerializer.java | 26 + .../nifi/fingerprint/FingerprintFactory.java | 5 +- .../nifi/groups/StandardProcessGroup.java | 95 ++++ .../nifi/persistence/TemplateSerializer.java | 1 + .../controller/StandardFlowServiceTest.java | 16 +- .../service/mock/MockProcessGroup.java | 30 ++ .../org/apache/nifi/web/NiFiServiceFacade.java | 7 +- .../nifi/web/StandardNiFiServiceFacade.java | 300 ++++++++--- .../nifi/web/api/ProcessGroupResource.java | 11 +- .../apache/nifi/web/api/ProcessorResource.java | 23 +- .../config/IllegalArgumentExceptionMapper.java | 5 +- .../nifi/web/controller/ControllerFacade.java | 2 +- .../org/apache/nifi/web/dao/TemplateDAO.java | 6 +- .../nifi/web/dao/impl/StandardTemplateDAO.java | 45 +- .../nifi/web/revision/NaiveRevisionManager.java | 131 +++-- .../nifi/web/revision/RevisionManager.java | 8 + .../web/revision/TestNaiveRevisionManager.java | 65 ++- 41 files changed, 1325 insertions(+), 1000 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-api/src/main/java/org/apache/nifi/web/Revision.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java index 0533307..4c47dde 100644 --- a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java +++ b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java @@ -81,11 +81,6 @@ public class Revision implements Serializable { return false; } - // TODO: THIS IS FOR TESTING PURPOSES! DO NOT LET THIS GET CHECKED IN THIS WAY!!!!!!!!!!!! - if (true) { - return true; - } - Revision thatRevision = (Revision) obj; // ensure that component ID's are the same (including null) if (thatRevision.getComponentId() == null && getComponentId() != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java index 6fa9daf..4f54923 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java @@ -31,6 +31,7 @@ public class TemplateDTO { private String uri; private String id; + private String groupId; private String name; private String description; private Date timestamp; @@ -40,9 +41,7 @@ public class TemplateDTO { /** * @return id for this template */ - @ApiModelProperty( - value = "The id of the template." - ) + @ApiModelProperty("The id of the template.") public String getId() { return id; } @@ -51,6 +50,16 @@ public class TemplateDTO { this.id = id; } + @ApiModelProperty("The id of the Process Group that the template belongs to.") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + /** * @return uri for this template */ http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java index 3b6d110..fc3558a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java @@ -29,9 +29,9 @@ import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; */ @XmlJavaTypeAdapter(DataFlowAdapter.class) public class StandardDataFlow implements Serializable, DataFlow { + private static final long serialVersionUID = 1L; private final byte[] flow; - private final byte[] templateBytes; private final byte[] snippetBytes; private boolean autoStartProcessors; @@ -40,23 +40,20 @@ public class StandardDataFlow implements Serializable, DataFlow { * Constructs an instance. * * @param flow a valid flow as bytes, which cannot be null - * @param templateBytes an XML representation of templates. May be null. * @param snippetBytes an XML representation of snippets. May be null. * * @throws NullPointerException if flow is null */ - public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { + public StandardDataFlow(final byte[] flow, final byte[] snippetBytes) { if(flow == null){ throw new NullPointerException("Flow cannot be null"); } this.flow = flow; - this.templateBytes = templateBytes; this.snippetBytes = snippetBytes; } public StandardDataFlow(final DataFlow toCopy) { this.flow = copy(toCopy.getFlow()); - this.templateBytes = copy(toCopy.getTemplates()); this.snippetBytes = copy(toCopy.getSnippets()); this.autoStartProcessors = toCopy.isAutoStartProcessors(); } @@ -70,10 +67,6 @@ public class StandardDataFlow implements Serializable, DataFlow { return flow; } - @Override - public byte[] getTemplates() { - return templateBytes; - } @Override public byte[] getSnippets() { http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java index 683fdf5..62796d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java @@ -21,7 +21,6 @@ package org.apache.nifi.cluster.protocol.jaxb.message; public class AdaptedDataFlow { private byte[] flow; - private byte[] templates; private byte[] snippets; private boolean autoStartProcessors; @@ -37,14 +36,6 @@ public class AdaptedDataFlow { this.flow = flow; } - public byte[] getTemplates() { - return templates; - } - - public void setTemplates(byte[] templates) { - this.templates = templates; - } - public byte[] getSnippets() { return snippets; } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java index 520b8eb..bd3e69e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java @@ -31,7 +31,6 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlo if (df != null) { aDf.setFlow(df.getFlow()); - aDf.setTemplates(df.getTemplates()); aDf.setSnippets(df.getSnippets()); aDf.setAutoStartProcessors(df.isAutoStartProcessors()); } @@ -41,7 +40,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlo @Override public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) { - final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets()); + final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets()); dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors()); return dataFlow; } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java index f9a986f..bd57fe4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java @@ -106,7 +106,7 @@ public class NodeProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, - new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), null, null, UUID.randomUUID().toString())); + new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0]), null, null, UUID.randomUUID().toString())); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); ConnectionRequestMessage request = new ConnectionRequestMessage(); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index d9ef0be..f172915 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -47,7 +47,6 @@ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; -import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -183,23 +182,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public synchronized void removeHeartbeat(final NodeIdentifier nodeId) { logger.debug("Deleting heartbeat for node {}", nodeId); - final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId(); - heartbeatMessages.remove(nodeId); - - try { - getClient().delete().forPath(nodeInfoPath); - logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId); - } catch (final NoNodeException e) { - // node did not exist. Just return. - logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeInfoPath); - return; - } catch (final Exception e) { - logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e); - logger.warn("", e); - - clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e); - } } protected Set<NodeIdentifier> getClusterNodeIds() { http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java index d7f6948..2063de4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java @@ -26,10 +26,10 @@ import java.util.regex.Pattern; import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.entity.FlowSnippetEntity; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; public class FlowSnippetEndpointMerger implements EndpointResponseMerger { public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); @@ -43,21 +43,21 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { @Override public NodeResponse merge(final URI uri, final String method, Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { - final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class); - final FlowSnippetDTO contents = responseEntity.getContents(); + final FlowEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowEntity.class); + final FlowDTO flowDto = responseEntity.getFlow(); - if (contents == null) { + if (flowDto == null) { return clientResponse; } else { - final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>(); - final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, ProcessorEntity>> processorMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> remoteProcessGroupMap = new HashMap<>(); for (final NodeResponse nodeResponse : successfulResponses) { - final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class); - final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents(); + final FlowEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowEntity.class); + final FlowDTO nodeContents = nodeResponseEntity.getFlow(); - for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) { - Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId()); + for (final ProcessorEntity nodeProcessor : nodeContents.getProcessors()) { + Map<NodeIdentifier, ProcessorEntity> innerMap = processorMap.get(nodeProcessor.getId()); if (innerMap == null) { innerMap = new HashMap<>(); processorMap.put(nodeProcessor.getId(), innerMap); @@ -66,8 +66,8 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { innerMap.put(nodeResponse.getNodeId(), nodeProcessor); } - for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) { - Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); + for (final RemoteProcessGroupEntity nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) { + Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); if (innerMap == null) { innerMap = new HashMap<>(); remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap); @@ -78,21 +78,19 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger { } final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); - for (final ProcessorDTO processor : contents.getProcessors()) { + for (final ProcessorEntity processor : flowDto.getProcessors()) { final String procId = processor.getId(); - final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId); + final Map<NodeIdentifier, ProcessorEntity> mergeMap = processorMap.get(procId); procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses); } final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); - for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) { - if (remoteProcessGroup.getContents() != null) { - final String remoteProcessGroupId = remoteProcessGroup.getId(); - final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); + for (final RemoteProcessGroupEntity remoteProcessGroup : flowDto.getRemoteProcessGroups()) { + final String remoteProcessGroupId = remoteProcessGroup.getId(); + final Map<NodeIdentifier, RemoteProcessGroupEntity> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); - rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); - } + rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java index 6a040fa..a892e8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java @@ -72,4 +72,18 @@ public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint<Proces // set the merged the validation errors clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); } + + protected void mergeResponses(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap, final Set<NodeResponse> successfulResponses, + final Set<NodeResponse> problematicResponses) { + + final ProcessorDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { + final ProcessorEntity nodeProcEntity = entry.getValue(); + final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeProcDto); + } + + mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java index 94383de..56636fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.endpoints; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -113,4 +114,16 @@ public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoi clientDto.setAuthorizationIssues(mergedAuthorizationIssues); } } + + protected void mergeResponses(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap, + Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + + final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { + dtoMap.put(entry.getKey(), entry.getValue().getComponent()); + } + + mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java index 8435c60..2fac89e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java @@ -39,7 +39,7 @@ public class ResponseUtils { public static Set<NodeIdentifier> findLongResponseTimes(final AsyncClusterResponse response, final double stdDeviationMultiple) { final Set<NodeIdentifier> slowResponses = new HashSet<>(); - if (response.isOlderThan(2, TimeUnit.SECONDS)) { + if (response.isOlderThan(1, TimeUnit.SECONDS)) { // If the response is older than 2 seconds, determines if any node took a long time to respond. final Set<NodeIdentifier> completedIds = response.getCompletedNodeIdentifiers(); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 3b71654..d218af2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -228,17 +228,24 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); } + logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); + + // Update headers to indicate the current revision so that we can + // prevent multiple users changing the flow at the same time + final Map<String, String> updatedHeaders = new HashMap<>(headers); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> UUID.randomUUID().toString()); + if (performVerification) { verifyState(method, uri.getPath()); } final int numRequests = responseMap.size(); if (numRequests >= MAX_CONCURRENT_REQUESTS) { + logger.debug("Cannot replicate request because there are {} outstanding HTTP Requests already", numRequests); throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); } // create the request objects and replicate to all nodes - final String requestId = UUID.randomUUID().toString(); final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId); final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); @@ -249,10 +256,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { responseMap.put(requestId, response); } - // Update headers to indicate the current revision so that we can - // prevent multiple users changing the flow at the same time - final Map<String, String> updatedHeaders = new HashMap<>(headers); - updatedHeaders.put(REQUEST_TRANSACTION_ID, UUID.randomUUID().toString()); + logger.debug("For Request ID {}, response object is {}", requestId, response); // setRevision(updatedHeaders); // if mutable request, we have to do a two-phase commit where we ask each node to verify @@ -262,6 +266,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // replicate the actual request. final boolean mutableRequest = isMutableRequest(method, uri.getPath()); if (mutableRequest && performVerification) { + logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); performVerification(nodeIds, method, uri, entity, updatedHeaders, response); return response; } @@ -309,13 +314,22 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { @Override public void onCompletion(final NodeResponse nodeResponse) { - // Add the node response to our collection. - nodeResponses.add(nodeResponse); + // Add the node response to our collection. We later need to know whether or + // not this is the last node response, so we add the response and then check + // the size within a synchronized block to ensure that those two things happen + // atomically. Otherwise, we could have multiple threads checking the sizes of + // the sets at the same time, which could result in multiple threads performing + // the 'all nodes are complete' logic. + final boolean allNodesResponded; + synchronized (nodeResponses) { + nodeResponses.add(nodeResponse); + allNodesResponded = nodeResponses.size() == numNodes; + } try { // If we have all of the node responses, then we can verify the responses // and if good replicate the original request to all of the nodes. - if (nodeResponses.size() == numNodes) { + if (allNodesResponded) { // Check if we have any requests that do not have a 150-Continue status code. final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count(); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index 335c0ef..7c99c93 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -512,7 +512,7 @@ public class DataFlowDaoImpl implements DataFlowDao { clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes)); } - final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); + final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, snippetBytes); dataFlow.setAutoStartProcessors(autoStart); return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); @@ -543,11 +543,9 @@ public class DataFlowDaoImpl implements DataFlowDao { final DataFlow dataFlow = clusterDataFlow.getDataFlow(); if (dataFlow == null) { writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]); writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]); } else { writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); } writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices()); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 7c2eabf..59d582b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -1957,10 +1957,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // disconnect problematic nodes if (!problematicNodeResponses.isEmpty()) { if (problematicNodeResponses.size() < nodeResponses.size()) { - logger.warn(String.format("The following nodes failed to process URI '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses)); - disconnectNodes(problematicNodeResponses, "Failed to process URI " + uriPath); + logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses)); + disconnectNodes(problematicNodeResponses, "Failed to process URI " + method + " " + uriPath); } else { - logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uriPath); + logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", method, uriPath); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java index e526ea3..34189ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java @@ -145,7 +145,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowSingleNode() throws Exception { String flowStr = "<rootGroup />"; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); @@ -165,7 +165,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithSameNodeIds() throws Exception { String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -193,7 +193,7 @@ public class DataFlowManagementServiceImplTest { String flowStr = "<rootGroup />"; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -214,7 +214,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithConstantNodeIdChanging() throws Exception { String flowStr = "<rootGroup />"; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -236,7 +236,7 @@ public class DataFlowManagementServiceImplTest { public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception { String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); @@ -259,7 +259,7 @@ public class DataFlowManagementServiceImplTest { public void testStopRequestedWhileRetrieving() throws Exception { String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); Set<NodeIdentifier> nodeIds = new HashSet<>(); for (int i = 0; i < 1000; i++) { nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false)); @@ -289,7 +289,7 @@ public class DataFlowManagementServiceImplTest { String flowStr = "<rootGroup />"; byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0]))); NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java index 57c1c30..312e3b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java @@ -24,11 +24,6 @@ public interface DataFlow { public byte[] getFlow(); /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates(); - - /** * @return the raw byte array of the snippets */ public byte[] getSnippets(); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java new file mode 100644 index 0000000..b9fd0cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; + +public class Template implements Authorizable { + + private final TemplateDTO dto; + private volatile ProcessGroup processGroup; + + public Template(final TemplateDTO dto) { + this.dto = dto; + } + + public String getIdentifier() { + return dto.getId(); + } + + /** + * Returns a TemplateDTO object that describes the contents of this Template + * + * @return template dto + */ + public TemplateDTO getDetails() { + return dto; + } + + public void setProcessGroup(final ProcessGroup group) { + this.processGroup = group; + } + + public ProcessGroup getProcessGroup() { + return processGroup; + } + + + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return ResourceFactory.getComponentResource(ResourceType.Template, dto.getId(), dto.getName()); + } + + private Set<Authorizable> getAuthorizableComponents() { + return getAuthorizableComponents(processGroup); + } + + private Set<Authorizable> getAuthorizableComponents(final ProcessGroup processGroup) { + final Set<Authorizable> authComponents = new HashSet<>(); + final FlowSnippetDTO snippet = dto.getSnippet(); + + authComponents.add(processGroup); + + // If there is any component in the DTO that still exists in the flow, check its authorizations + for (final ConnectionDTO connectionDto : snippet.getConnections()) { + final Connection connection = processGroup.getConnection(connectionDto.getId()); + if (connection != null) { + authComponents.add(connection); + } + } + + // TODO: Authorize Controller Services + for (final ControllerServiceDTO service : snippet.getControllerServices()) { + } + + for (final LabelDTO labelDto : snippet.getLabels()) { + final Label label = processGroup.getLabel(labelDto.getId()); + if (label != null) { + authComponents.add(label); + } + } + + for (final ProcessorDTO processorDto : snippet.getProcessors()) { + final ProcessorNode procNode = processGroup.getProcessor(processorDto.getId()); + if (procNode != null) { + authComponents.add(procNode); + } + } + + for (final RemoteProcessGroupDTO groupDto : snippet.getRemoteProcessGroups()) { + final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(groupDto.getId()); + if (rpg != null) { + authComponents.add(rpg); + } + } + + for (final ProcessGroupDTO groupDto : snippet.getProcessGroups()) { + final ProcessGroup group = processGroup.getProcessGroup(groupDto.getId()); + if (group != null) { + authComponents.addAll(getAuthorizableComponents(processGroup)); + } + } + + return authComponents; + } + + @Override + public void authorize(final Authorizer authorizer, final RequestAction action) throws AccessDeniedException { + final AuthorizationResult result = checkAuthorization(authorizer, action, true); + if (Result.Denied.equals(result)) { + final String explanation = result.getExplanation() == null ? "Access is denied" : result.getExplanation(); + throw new AccessDeniedException(explanation); + } + } + + @Override + public AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action) { + return checkAuthorization(authorizer, action, false); + } + + private AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action, final boolean accessAttempt) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // TODO - include user details context + + // build the request + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .identity(user.getIdentity()) + .anonymous(user.isAnonymous()) + .accessAttempt(accessAttempt) + .action(action) + .resource(getResource()) + .build(); + + // perform the authorization + final AuthorizationResult result = authorizer.authorize(request); + + // verify the results + if (Result.ResourceNotFound.equals(result.getResult())) { + for (final Authorizable child : getAuthorizableComponents()) { + final AuthorizationResult childResult = child.checkAuthorization(authorizer, action); + if (Result.Denied.equals(childResult)) { + return childResult; + } + } + + return AuthorizationResult.denied(); + } else { + return result; + } + } + + @Override + public String toString() { + return "Template[id=" + getIdentifier() + ", Name=" + dto.getName() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index e1f66ee..c3a4c8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; +import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; @@ -768,4 +769,45 @@ public interface ProcessGroup extends Authorizable { * @throws IllegalStateException if the move is not valid at this time */ void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup); + + /** + * Adds the given template to this Process Group + * + * @param template the template to add + */ + void addTemplate(Template template); + + /** + * Removes the given template from the Process Group + * + * @param template the template to remove + */ + void removeTemplate(Template template); + + /** + * Returns the template with the given ID + * + * @param id the ID of the template + * @return the template with the given ID or <code>null</code> if no template + * exists in this Process Group with the given ID + */ + Template getTemplate(String id); + + /** + * @param id of the template + * @return the Template with the given ID, if it exists as a child or + * descendant of this ProcessGroup. This performs a recursive search of all + * descendant ProcessGroups + */ + Template findTemplate(String id); + + /** + * @return a Set of all Templates that belong to this Process Group + */ + Set<Template> getTemplates(); + + /** + * @return a Set of all Templates that belong to this Process Group and any descendant Process Groups + */ + Set<Template> findAllTemplates(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4243712..73d36a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -82,8 +82,8 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; import org.apache.nifi.connectable.StandardConnection; -import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater; +import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; @@ -220,7 +220,6 @@ import org.apache.nifi.web.api.dto.RelationshipDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; @@ -257,7 +256,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final ProvenanceEventRepository provenanceEventRepository; private final VolatileBulletinRepository bulletinRepository; private final StandardProcessScheduler processScheduler; - private final TemplateManager templateManager; private final SnippetManager snippetManager; private final long gracefulShutdownSeconds; private final ExtensionManager extensionManager; @@ -478,11 +476,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.configuredForClustering = configuredForClustering; this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); - try { - this.templateManager = new TemplateManager(properties.getTemplateDirectory()); - } catch (final IOException e) { - throw new RuntimeException(e); - } this.snippetManager = new SnippetManager(); @@ -1478,72 +1471,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - // - // Template access - // - /** - * Adds a template to this controller. The contents of this template must be part of the current flow. This is going create a template based on a snippet of this flow. - * - * @param dto template - * @return a copy of the given DTO - * @throws IOException if an I/O error occurs when persisting the Template - * @throws NullPointerException if the DTO is null - * @throws IllegalArgumentException if does not contain all required information, such as the template name or a processor's configuration element - */ - public Template addTemplate(final TemplateDTO dto) throws IOException { - return templateManager.addTemplate(dto); - } - - /** - * Removes all templates from this controller - * - * @throws IOException ioe - */ - public void clearTemplates() throws IOException { - templateManager.clear(); - } - - /** - * Imports the specified template into this controller. The contents of this template may have come from another NiFi instance. - * - * @param dto dto - * @return template - * @throws IOException ioe - */ - public Template importTemplate(final TemplateDTO dto) throws IOException { - return templateManager.importTemplate(dto); - } - - /** - * @param id identifier - * @return the template with the given ID, or <code>null</code> if no template exists with the given ID - */ - public Template getTemplate(final String id) { - return templateManager.getTemplate(id); - } - - public TemplateManager getTemplateManager() { - return templateManager; - } - - /** - * @return all templates that this controller knows about - */ - public Collection<Template> getTemplates() { - return templateManager.getTemplates(); - } - - /** - * Removes the template with the given ID. - * - * @param id the ID of the template to remove - * @throws NullPointerException if the argument is null - * @throws IllegalStateException if no template exists with the given ID - * @throws IOException if template could not be removed - */ - public void removeTemplate(final String id) throws IOException, IllegalStateException { - templateManager.removeTemplate(id); - } private Position toPosition(final PositionDTO dto) { return new Position(dto.getX(), dto.getY()); http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index f8b3262..296659c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -16,20 +16,25 @@ */ package org.apache.nifi.controller; +import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -76,13 +81,16 @@ import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.lifecycle.LifeCycleStartException; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.persistence.FlowConfigurationDAO; import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO; +import org.apache.nifi.persistence.TemplateDeserializer; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; +import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -522,16 +530,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler { final byte[] flowBytes = baos.toByteArray(); baos.reset(); - final byte[] templateBytes = controller.getTemplateManager().export(); final byte[] snippetBytes = controller.getSnippetManager().export(); // create the response final FlowResponseMessage response = new FlowResponseMessage(); - response.setDataFlow(new StandardDataFlow(flowBytes, templateBytes, snippetBytes)); + response.setDataFlow(new StandardDataFlow(flowBytes, snippetBytes)); return response; - } catch (final Exception ex) { throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex); } finally { @@ -606,27 +612,21 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { logger.trace("Loading flow from bytes"); - final TemplateManager templateManager = controller.getTemplateManager(); - templateManager.loadTemplates(); - logger.trace("Finished loading templates"); // resolve the given flow (null means load flow from disk) final DataFlow actualProposedFlow; final byte[] flowBytes; - final byte[] templateBytes; if (proposedFlow == null) { final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream(); copyCurrentFlow(flowOnDisk); flowBytes = flowOnDisk.toByteArray(); - templateBytes = templateManager.export(); logger.debug("Loaded Flow from bytes"); } else { flowBytes = proposedFlow.getFlow(); - templateBytes = proposedFlow.getTemplates(); logger.debug("Loaded flow from proposed flow"); } - actualProposedFlow = new StandardDataFlow(flowBytes, templateBytes, null); + actualProposedFlow = new StandardDataFlow(flowBytes, null); if (firstControllerInitialization) { // load the controller services @@ -642,6 +642,17 @@ public class StandardFlowService implements FlowService, ProtocolHandler { throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty"); } + final List<Template> templates = loadTemplates(); + for (final Template template : templates) { + final Template existing = rootGroup.getTemplate(template.getIdentifier()); + if (existing == null) { + logger.info("Imported Template '{}' to Root Group", template.getDetails().getName()); + rootGroup.addTemplate(template); + } else { + logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName()); + } + } + // lazy initialization of controller tasks and flow if (firstControllerInitialization) { logger.debug("First controller initialization. Loading reporting tasks and initializing controller."); @@ -653,6 +664,56 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } + /** + * In NiFi 0.x, templates were stored in a templates directory as separate files. They are + * now stored in the flow itself. If there already are templates in that directory, though, + * we want to restore them. + * + * @return the templates found in the templates directory + * @throws IOException if unable to read from the file system + */ + public List<Template> loadTemplates() throws IOException { + final NiFiProperties properties = NiFiProperties.getInstance(); + final Path templatePath = properties.getTemplateDirectory(); + + final File[] files = templatePath.toFile().listFiles(pathname -> { + final String lowerName = pathname.getName().toLowerCase(); + return lowerName.endsWith(".template") || lowerName.endsWith(".xml"); + }); + + if (files == null) { + return Collections.emptyList(); + } + + final List<Template> templates = new ArrayList<>(); + for (final File file : files) { + try (final FileInputStream fis = new FileInputStream(file); + final BufferedInputStream bis = new BufferedInputStream(fis)) { + + final TemplateDTO templateDto; + try { + templateDto = TemplateDeserializer.deserialize(bis); + } catch (final Exception e) { + logger.error("Unable to interpret " + file + " as a Template. Skipping file."); + continue; + } + + if (templateDto.getId() == null) { + // If there is no ID assigned, we need to assign one. We do this by generating + // an ID from the name. This is because we know that Template Names are unique + // and are consistent across all nodes in the cluster. + final String uuid = UUID.nameUUIDFromBytes(templateDto.getName().getBytes(StandardCharsets.UTF_8)).toString(); + templateDto.setId(uuid); + } + + final Template template = new Template(templateDto); + templates.add(template); + } + } + + return templates; + } + private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely) throws ConnectionException { writeLock.lock(); try { @@ -759,7 +820,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // start the processors as indicated by the dataflow controller.onFlowInitialized(dataFlow.isAutoStartProcessors()); - loadTemplates(dataFlow.getTemplates()); loadSnippets(dataFlow.getSnippets()); controller.startHeartbeating(); } catch (final UninheritableFlowException ufe) { @@ -794,17 +854,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } - public void loadTemplates(final byte[] bytes) throws IOException { - if (bytes.length == 0) { - return; - } - - controller.clearTemplates(); - - for (final Template template : TemplateManager.parseBytes(bytes)) { - controller.addTemplate(template.getDetails()); - } - } public void loadSnippets(final byte[] bytes) throws IOException { if (bytes.length == 0) { @@ -828,6 +877,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public void run() { + final ClassLoader currentCl = Thread.currentThread().getContextClassLoader(); + final ClassLoader cl = NarClassLoaders.getFrameworkClassLoader(); + Thread.currentThread().setContextClassLoader(cl); try { //Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again final SaveHolder holder = StandardFlowService.this.saveHolder.get(); @@ -864,6 +916,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // record the failed save as a bulletin final Bulletin saveFailureBulletin = BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable to save flow controller configuration."); controller.getBulletinRepository().addBulletin(saveFailureBulletin); + } finally { + if (currentCl != null) { + Thread.currentThread().setContextClassLoader(currentCl); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 97ed3a9..fe3e59b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -24,7 +24,6 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -98,6 +97,7 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -138,6 +138,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { @Override public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { + // TODO - Include templates + // handle corner cases involving no proposed flow if (proposedFlow == null) { if (controller.getGroup(controller.getRootGroupId()).isEmpty()) { @@ -204,14 +206,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { throw new FlowSerializationException(e); } - logger.trace("Exporting templates from controller"); - final byte[] existingTemplates = controller.getTemplateManager().export(); logger.trace("Exporting snippets from controller"); final byte[] existingSnippets = controller.getSnippetManager().export(); - final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingTemplates, existingSnippets); - - final boolean existingTemplatesEmpty = existingTemplates == null || existingTemplates.length == 0; + final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets); // check that the proposed flow is inheritable by the controller try { @@ -222,13 +220,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting); } } - if (!existingTemplatesEmpty) { - logger.trace("Checking template inheritability"); - final String problemInheriting = checkTemplateInheritability(existingDataFlow, proposedFlow); - if (problemInheriting != null) { - throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting); - } - } } catch (final FingerprintException fe) { throw new FlowSerializationException("Failed to generate flow fingerprints", fe); } @@ -301,16 +292,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - logger.trace("Synching templates"); - if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) { - // need to load templates - final TemplateManager templateManager = controller.getTemplateManager(); - final List<Template> proposedTemplateList = TemplateManager.parseBytes(proposedFlow.getTemplates()); - for (final Template template : proposedTemplateList) { - templateManager.addTemplate(template.getDetails()); - } - } - // clear the snippets that are currently in memory logger.trace("Clearing existing snippets"); final SnippetManager snippetManager = controller.getSnippetManager(); @@ -711,6 +692,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { updateControllerService(controller, serviceNodeElement, encryptor); } + // Replace the templates with those from the proposed flow + final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template"); + for (final Template template : processGroup.getTemplates()) { + processGroup.removeTemplate(template); + } + for (final Element templateElement : templateNodeList) { + final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement); + final Template template = new Template(templateDto); + processGroup.addTemplate(template); + } + return processGroup; } @@ -1052,6 +1044,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState); } + final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template"); + for (final Element templateNode : templateNodeList) { + final TemplateDTO templateDTO = TemplateUtils.parseDto(templateNode); + final Template template = new Template(templateDTO); + processGroup.addTemplate(template); + } + return processGroup; } @@ -1103,60 +1102,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return null; } - /** - * Returns true if the given controller can inherit the proposed flow without orphaning flow files. - * - * @param existingFlow flow - * @param proposedFlow the flow to inherit - * - * @return null if the controller can inherit the specified flow, an explanation of why it cannot be inherited otherwise - * - * @throws FingerprintException if flow fingerprints could not be generated - */ - public String checkTemplateInheritability(final DataFlow existingFlow, final DataFlow proposedFlow) throws FingerprintException { - if (existingFlow == null) { - return null; // no existing flow, so equivalent to proposed flow - } - - // check if the Flow is inheritable - final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor); - // check if the Templates are inheritable - final byte[] existingTemplateBytes = existingFlow.getTemplates(); - if (existingTemplateBytes == null || existingTemplateBytes.length == 0) { - return null; - } - - final List<Template> existingTemplates = TemplateManager.parseBytes(existingTemplateBytes); - final String existingTemplateFingerprint = fingerprintFactory.createFingerprint(existingTemplates); - if (existingTemplateFingerprint.trim().isEmpty()) { - return null; - } - - final byte[] proposedTemplateBytes = proposedFlow.getTemplates(); - if (proposedTemplateBytes == null || proposedTemplateBytes.length == 0) { - return "Proposed Flow does not contain any Templates but Current Flow does"; - } - - final List<Template> proposedTemplates = TemplateManager.parseBytes(proposedTemplateBytes); - final String proposedTemplateFingerprint = fingerprintFactory.createFingerprint(proposedTemplates); - if (proposedTemplateFingerprint.trim().isEmpty()) { - return "Proposed Flow does not contain any Templates but Current Flow does"; - } - - try { - final String existingTemplateMd5 = fingerprintFactory.md5Hash(existingTemplateFingerprint); - final String proposedTemplateMd5 = fingerprintFactory.md5Hash(proposedTemplateFingerprint); - - if (!existingTemplateMd5.equals(proposedTemplateMd5)) { - return findFirstDiscrepancy(existingTemplateFingerprint, proposedTemplateFingerprint, "Templates"); - } - } catch (final NoSuchAlgorithmException e) { - throw new FingerprintException(e); - } - - return null; - } - private String findFirstDiscrepancy(final String existing, final String proposed, final String comparisonDescription) { final int shortestFileLength = Math.min(existing.length(), proposed.length()); for (int i = 0; i < shortestFileLength; i++) { http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java deleted file mode 100644 index c2e8e04..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.web.api.dto.TemplateDTO; - -public class Template { - - private final TemplateDTO dto; - - public Template(final TemplateDTO dto) { - this.dto = dto; - } - - /** - * Returns a TemplateDTO object that describes the contents of this Template - * - * @return template dto - */ - public TemplateDTO getDetails() { - return dto; - } -}
