NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated authorizations required for version control endpoints - Add state and percent complete fields ot VersionedFlowUpdateRequestDTO
Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b00dff1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b00dff1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b00dff1 Branch: refs/heads/master Commit: 6b00dff1a88a00d87dc77480853cc7c5637ab774 Parents: 696d583 Author: Mark Payne <[email protected]> Authored: Sat Nov 4 14:19:49 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:53 2018 -0500 ---------------------------------------------------------------------- .../web/api/dto/ComponentDifferenceDTO.java | 100 ++++++ .../api/dto/VersionControlInformationDTO.java | 30 ++ .../api/dto/VersionedFlowUpdateRequestDTO.java | 30 ++ .../web/api/entity/FlowComparisonEntity.java | 40 +++ .../nifi-framework-core-api/pom.xml | 31 +- .../apache/nifi/registry/flow/FlowRegistry.java | 66 +++- .../registry/flow/UnknownResourceException.java | 33 -- .../flow/VersionControlInformation.java | 20 ++ .../nifi-framework/nifi-framework-core/pom.xml | 4 + .../nifi/connectable/StandardConnection.java | 6 +- .../controller/StandardFlowSynchronizer.java | 19 +- .../serialization/FlowFromDOMFactory.java | 3 + .../serialization/StandardFlowSerializer.java | 3 + .../nifi/groups/StandardProcessGroup.java | 316 ++++++++++++------- .../registry/flow/FileBasedFlowRegistry.java | 65 +++- .../registry/flow/RestBasedFlowRegistry.java | 235 ++++++++++++++ .../flow/StandardFlowRegistryClient.java | 20 ++ .../flow/StandardVersionControlInformation.java | 149 ++++++++- .../flow/mapping/NiFiRegistryFlowMapper.java | 30 +- .../src/main/resources/FlowConfiguration.xsd | 5 +- .../src/main/resources/nifi-context.xml | 4 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 45 ++- .../nifi/web/StandardNiFiServiceFacade.java | 98 +++++- .../org/apache/nifi/web/api/FlowResource.java | 3 +- .../nifi/web/api/ProcessGroupResource.java | 61 +++- .../apache/nifi/web/api/VersionsResource.java | 242 +++++++------- .../web/api/concurrent/AsyncRequestManager.java | 1 - .../api/concurrent/AsynchronousWebRequest.java | 28 +- .../StandardAsynchronousWebRequest.java | 52 ++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 64 +++- .../nifi/web/dao/impl/FlowRegistryDAO.java | 51 ++- .../nifi/web/dao/impl/StandardInputPortDAO.java | 8 +- .../web/dao/impl/StandardProcessGroupDAO.java | 24 +- .../nifi/web/util/CancellableTimedPause.java | 9 +- pom.xml | 6 + 35 files changed, 1494 insertions(+), 407 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDifferenceDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDifferenceDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDifferenceDTO.java new file mode 100644 index 0000000..8febce0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDifferenceDTO.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import java.util.List; +import java.util.Objects; + +import javax.xml.bind.annotation.XmlType; + +import io.swagger.annotations.ApiModelProperty; + +@XmlType(name = "componentDifference") +public class ComponentDifferenceDTO { + private String componentType; + private String componentId; + private String componentName; + private String processGroupId; + private List<String> differences; + + @ApiModelProperty("The type of component") + public String getComponentType() { + return componentType; + } + + public void setComponentType(String componentType) { + this.componentType = componentType; + } + + @ApiModelProperty("The ID of the component") + public String getComponentId() { + return componentId; + } + + public void setComponentId(String componentId) { + this.componentId = componentId; + } + + @ApiModelProperty("The name of the component") + public String getComponentName() { + return componentName; + } + + public void setComponentName(String componentName) { + this.componentName = componentName; + } + + @ApiModelProperty("The ID of the Process Group that the component belongs to") + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(String processGroupId) { + this.processGroupId = processGroupId; + } + + @ApiModelProperty("The differences in the component between the two flows") + public List<String> getDifferences() { + return differences; + } + + public void setDifferences(List<String> differences) { + this.differences = differences; + } + + @Override + public int hashCode() { + return Objects.hash(componentId); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof ComponentDifferenceDTO)) { + return false; + } + + final ComponentDifferenceDTO other = (ComponentDifferenceDTO) obj; + return componentId.equals(other.getComponentId()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java index e9aa246..c31a957 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -25,9 +25,12 @@ import javax.xml.bind.annotation.XmlType; public class VersionControlInformationDTO { private String groupId; private String registryId; + private String registryName; private String bucketId; + private String bucketName; private String flowId; private String flowName; + private String flowDescription; private Integer version; private Boolean modified; private Boolean current; @@ -50,6 +53,15 @@ public class VersionControlInformationDTO { this.registryId = registryId; } + @ApiModelProperty(value = "The name of the registry that the flow is stored in", readOnly = true) + public String getRegistryName() { + return registryName; + } + + public void setRegistryName(final String registryName) { + this.registryName = registryName; + } + @ApiModelProperty("The ID of the bucket that the flow is stored in") public String getBucketId() { return bucketId; @@ -59,6 +71,15 @@ public class VersionControlInformationDTO { this.bucketId = bucketId; } + @ApiModelProperty(value = "The name of the bucket that the flow is stored in", readOnly = true) + public String getBucketName() { + return bucketName; + } + + public void setBucketName(String bucketName) { + this.bucketName = bucketName; + } + @ApiModelProperty("The ID of the flow") public String getFlowId() { return flowId; @@ -77,6 +98,15 @@ public class VersionControlInformationDTO { this.flowName = flowName; } + @ApiModelProperty("The description of the flow") + public String getFlowDescription() { + return flowDescription; + } + + public void setFlowDescription(String flowDescription) { + this.flowDescription = flowDescription; + } + @ApiModelProperty("The version of the flow") public Integer getVersion() { return version; http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java index aa42bf6..013b40d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java @@ -32,6 +32,9 @@ public class VersionedFlowUpdateRequestDTO { private Date lastUpdated; private boolean complete = false; private String failureReason; + private int percentComplete; + private String state; + private VersionControlInformationDTO versionControlInformation; @ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to") public String getProcessGroupId() { @@ -87,4 +90,31 @@ public class VersionedFlowUpdateRequestDTO { public void setFailureReason(String reason) { this.failureReason = reason; } + + @ApiModelProperty(value = "The VersionControlInformation that describes where the Versioned Flow is located; this may not be populated until the request is completed.", readOnly = true) + public VersionControlInformationDTO getVersionControlInformation() { + return versionControlInformation; + } + + public void setVersionControlInformation(VersionControlInformationDTO versionControlInformation) { + this.versionControlInformation = versionControlInformation; + } + + @ApiModelProperty(value = "The state of the request", readOnly = true) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + @ApiModelProperty(value = "The percentage complete for the request, between 0 and 100", readOnly = true) + public int getPercentComplete() { + return percentComplete; + } + + public void setPercentComplete(int percentComplete) { + this.percentComplete = percentComplete; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowComparisonEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowComparisonEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowComparisonEntity.java new file mode 100644 index 0000000..bc0251b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowComparisonEntity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import java.util.Set; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; + +import io.swagger.annotations.ApiModelProperty; + +@XmlRootElement(name = "flowComparisonEntity") +public class FlowComparisonEntity extends Entity { + private Set<ComponentDifferenceDTO> componentDifferences; + + @ApiModelProperty("The list of differences for each component in the flow that is not the same between the two flows") + public Set<ComponentDifferenceDTO> getComponentDifferences() { + return componentDifferences; + } + + public void setComponentDifferences(Set<ComponentDifferenceDTO> componentDifferences) { + this.componentDifferences = componentDifferences; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml index d1bce36..3b86428 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml @@ -1,19 +1,16 @@ <?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> @@ -62,5 +59,9 @@ <groupId>org.apache.nifi.registry</groupId> <artifactId>nifi-registry-data-model</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-client</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 4efff94..10db9cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -19,6 +19,7 @@ package org.apache.nifi.registry.flow; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryException; import java.io.IOException; import java.util.Set; @@ -71,7 +72,47 @@ public interface FlowRegistry { * @param user current user * @return buckets for this user */ - Set<Bucket> getBuckets(NiFiUser user) throws IOException; + Set<Bucket> getBuckets(NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Gets the bucket with the given ID + * + * @param bucketId the id of the bucket + * @param user user on whose behalf the request is being made + * @return the bucket with the given ID + */ + Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Gets the bucket with the given ID + * + * @param bucketId the id of the bucket + * @return the bucket with the given ID + */ + Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException; + + /** + * Retrieves the set of all Versioned Flows for the specified bucket + * + * @param bucketId the ID of the bucket + * @param user the user on whose behalf the request is being made + * @return the set of all Versioned Flows for the specified bucket + * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID + */ + Set<VersionedFlow> getFlows(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException; + + /** + * Retrieves the set of all versions of the specified flow + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @param user the user on whose behalf the request is being made + * @return the set of all versions of the specified flow + * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID + */ + Set<VersionedFlowSnapshotMetadata> getFlowVersions(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException; /** * Registers the given Versioned Flow with the Flow Registry @@ -80,9 +121,9 @@ public interface FlowRegistry { * @return the fully populated VersionedFlow * * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null - * @throws UnknownResourceException if the bucket id does not exist + * @throws NiFiRegistryException if the bucket id does not exist */ - VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, UnknownResourceException; + VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, NiFiRegistryException; /** * Adds the given snapshot to the Flow Registry for the given flow @@ -90,13 +131,14 @@ public interface FlowRegistry { * @param flow the Versioned Flow * @param snapshot the snapshot of the flow * @param comments any comments for the snapshot + * @param expectedVersion the version of the flow that we expect to save this snapshot as * @return the versioned flow snapshot * * @throws IOException if unable to communicate with the registry * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null - * @throws UnknownResourceException if the flow does not exist + * @throws NiFiRegistryException if the flow does not exist */ - VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException; + VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) throws IOException, NiFiRegistryException; /** * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow @@ -106,9 +148,9 @@ public interface FlowRegistry { * @return the latest version of the Flow * * @throws IOException if unable to communicate with the Flow Registry - * @throws UnknownResourceException if unable to find the bucket with the given ID or the flow with the given ID + * @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID */ - int getLatestVersion(String bucketId, String flowId) throws IOException, UnknownResourceException; + int getLatestVersion(String bucketId, String flowId) throws IOException, NiFiRegistryException; /** * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry @@ -119,12 +161,13 @@ public interface FlowRegistry { * @return the contents of the Flow from the Flow Registry * * @throws IOException if unable to communicate with the Flow Registry - * @throws UnknownResourceException if unable to find the contents of the flow due to the bucket or flow not existing, + * @throws NiFiRegistryException if unable to find the contents of the flow due to the bucket or flow not existing, * or the specified version of the flow not existing * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, UnknownResourceException; + // TODO: Need to pass in user + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException; /** * Retrieves a VersionedFlow by bucket id and flow id @@ -134,7 +177,8 @@ public interface FlowRegistry { * @return the VersionedFlow for the given bucket and flow ID's * * @throws IOException if unable to communicate with the Flow Registry - * @throws UnknownResourceException if unable to find a flow with the given bucket ID and flow ID + * @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID */ - VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, UnknownResourceException; + // TODO: Need to pass in user + VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java deleted file mode 100644 index 8c95e67..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.registry.flow; - -public class UnknownResourceException extends Exception { - - public UnknownResourceException(String message) { - super(message); - } - - public UnknownResourceException(String message, Throwable cause) { - super(message, cause); - } - - public UnknownResourceException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index ea70b1c..67c3635 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -33,16 +33,36 @@ public interface VersionControlInformation { String getRegistryIdentifier(); /** + * @return the name of the Flow Registry that this flow is tracking to + */ + String getRegistryName(); + + /** * @return the unique identifier of the bucket that this flow belongs to */ String getBucketIdentifier(); /** + * @return the name of the bucket that this flow belongs to + */ + String getBucketName(); + + /** * @return the unique identifier of this flow in the Flow Registry */ String getFlowIdentifier(); /** + * @return the name of the flow + */ + String getFlowName(); + + /** + * @return the description of the flow + */ + String getFlowDescription(); + + /** * @return the version of the flow in the Flow Registry that this flow is based on. */ int getVersion(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 6548004..8edf394 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -161,6 +161,10 @@ <artifactId>nifi-registry-flow-diff</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-client</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 7aa3003..d6ea4b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -270,8 +270,10 @@ public final class StandardConnection implements Connection { return; } - if (getSource().isRunning()) { - throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running"); + try { + getSource().verifyCanUpdate(); + } catch (final IllegalStateException ise) { + throw new IllegalStateException("Cannot update the relationships for Connection", ise); } try { http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 5a7aeec..71a587c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -85,9 +85,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.StandardVersionControlInformation; -import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; @@ -1110,14 +1110,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final VersionControlInformationDTO versionControlInfoDto = processGroupDTO.getVersionControlInformation(); if (versionControlInfoDto != null) { - final String registryId = versionControlInfoDto.getRegistryId(); - final String bucketId = versionControlInfoDto.getBucketId(); - final String flowId = versionControlInfoDto.getFlowId(); - final int version = versionControlInfoDto.getVersion(); - final boolean modified = false; - final boolean current = true; - - final VersionControlInformation versionControlInformation = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, modified, current); + final FlowRegistry flowRegistry = controller.getFlowRegistryClient().getFlowRegistry(versionControlInfoDto.getRegistryId()); + final String registryName = flowRegistry == null ? versionControlInfoDto.getRegistryId() : flowRegistry.getName(); + + final StandardVersionControlInformation versionControlInformation = StandardVersionControlInformation.Builder.fromDto(versionControlInfoDto) + .registryName(registryName) + .modified(false) + .current(true) + .build(); + // pass empty map for the version control mapping because the VersionedComponentId has already been set on the components processGroup.setVersionControlInformation(versionControlInformation, Collections.emptyMap()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index a2a589a..e95845a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -230,7 +230,10 @@ public class FlowFromDOMFactory { final VersionControlInformationDTO dto = new VersionControlInformationDTO(); dto.setRegistryId(getString(versionControlInfoElement, "registryId")); dto.setBucketId(getString(versionControlInfoElement, "bucketId")); + dto.setBucketName(getString(versionControlInfoElement, "bucketName")); dto.setFlowId(getString(versionControlInfoElement, "flowId")); + dto.setFlowName(getString(versionControlInfoElement, "flowName")); + dto.setFlowDescription(getString(versionControlInfoElement, "flowDescription")); dto.setVersion(getInt(versionControlInfoElement, "version")); return dto; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index f921bc6..b0fe508 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -190,7 +190,10 @@ public class StandardFlowSerializer implements FlowSerializer { final Element versionControlInfoElement = doc.createElement("versionControlInformation"); addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier()); addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier()); + addTextElement(versionControlInfoElement, "bucketName", versionControlInfo.getBucketName()); addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier()); + addTextElement(versionControlInfoElement, "flowName", versionControlInfo.getFlowName()); + addTextElement(versionControlInfoElement, "flowDescription", versionControlInfo.getFlowDescription()); addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion()); element.appendChild(versionControlInfoElement); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1d8652e..2783e96 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,6 +16,30 @@ */ package org.apache.nifi.groups; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -68,16 +92,17 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.Bundle; import org.apache.nifi.registry.flow.ConnectableComponent; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.StandardVersionControlInformation; -import org.apache.nifi.registry.flow.UnknownResourceException; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; @@ -88,11 +113,13 @@ import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.FlowComparator; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; @@ -109,30 +136,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public final class StandardProcessGroup implements ProcessGroup { private final String id; @@ -2812,7 +2815,9 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) { - final StandardVersionControlInformation svci = new StandardVersionControlInformation(versionControlInformation.getRegistryIdentifier(), + final StandardVersionControlInformation svci = new StandardVersionControlInformation( + versionControlInformation.getRegistryIdentifier(), + versionControlInformation.getRegistryName(), versionControlInformation.getBucketIdentifier(), versionControlInformation.getFlowIdentifier(), versionControlInformation.getVersion(), @@ -2822,10 +2827,19 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Optional<Boolean> getModified() { - return StandardProcessGroup.this.isModified(); + final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications(); + if (differences == null) { + return Optional.ofNullable(null); + } + + return Optional.of(!differences.isEmpty()); } }; + svci.setBucketName(versionControlInformation.getBucketName()); + svci.setFlowName(versionControlInformation.getFlowName()); + svci.setFlowDescription(versionControlInformation.getFlowDescription()); + writeLock.lock(); try { updateVersionedComponentIds(this, versionedComponentIds); @@ -2906,21 +2920,6 @@ public final class StandardProcessGroup implements ProcessGroup { return; } - try { - final int latestVersion = flowRegistry.getLatestVersion(vci.getBucketIdentifier(), vci.getFlowIdentifier()); - - if (latestVersion == vci.getVersion()) { - LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion); - vci.setCurrent(true); - } else { - vci.setCurrent(false); - LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", - new Object[] {this, vci.getVersion(), latestVersion}); - } - } catch (final IOException | UnknownResourceException e) { - LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); - } - final VersionedProcessGroup snapshot = vci.getFlowSnapshot(); if (snapshot == null) { // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. @@ -2929,12 +2928,33 @@ public final class StandardProcessGroup implements ProcessGroup { final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion()); final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); - } catch (final IOException | UnknownResourceException e) { + } catch (final IOException | NiFiRegistryException e) { LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}", new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e); return; } } + + try { + final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier()); + final int latestVersion = (int) versionedFlow.getVersionCount(); + + vci.setBucketName(versionedFlow.getBucketName()); + vci.setFlowName(versionedFlow.getName()); + vci.setFlowDescription(versionedFlow.getDescription()); + vci.setRegistryName(flowRegistry.getName()); + + if (latestVersion == vci.getVersion()) { + LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion); + vci.setCurrent(true); + } else { + vci.setCurrent(false); + LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", + new Object[] {this, vci.getVersion(), latestVersion}); + } + } catch (final IOException | NiFiRegistryException e) { + LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); + } } @@ -2945,12 +2965,12 @@ public final class StandardProcessGroup implements ProcessGroup { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient()); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set<String> updatedVersionedComponentIds = flowComparison.getDifferences().stream() @@ -2997,7 +3017,11 @@ public final class StandardProcessGroup implements ProcessGroup { .collect(Collectors.toSet()); final Set<String> variablesRemoved = new HashSet<>(existingVariableNames); - variablesRemoved.removeAll(proposed.getVariables().keySet()); + + if (proposed.getVariables() != null) { + variablesRemoved.removeAll(proposed.getVariables().keySet()); + } + final Map<String, String> updatedVariableMap = new HashMap<>(); variablesRemoved.forEach(var -> updatedVariableMap.put(var, null)); @@ -3017,12 +3041,25 @@ public final class StandardProcessGroup implements ProcessGroup { final String flowId = remoteCoordinates.getFlowId(); final int version = remoteCoordinates.getVersion(); - final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, proposed, false, true); + final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); + final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); + + final VersionControlInformation vci = new StandardVersionControlInformation.Builder() + .registryId(registryId) + .registryName(registryName) + .bucketId(bucketId) + .bucketName(bucketId) // bucket name not yet known + .flowId(flowId) + .flowName(flowId) // flow id not yet known + .version(version) + .modified(false) + .current(true) + .build(); + group.setVersionControlInformation(vci, Collections.emptyMap()); } // Child groups - // TODO: Need to take into account if child group is under version control pointing to a different Versioned Flow and if so need to handle it differently. final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet()); @@ -3031,7 +3068,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); if (childGroup == null) { - final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed); + final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed); LOG.info("Added {} to {}", added, this); } else { updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName); @@ -3050,7 +3087,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedControllerService proposedService : proposed.getControllerServices()) { final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); if (service == null) { - final ControllerServiceNode added = addControllerService(proposedService, componentIdSeed); + final ControllerServiceNode added = addControllerService(group, proposedService, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { updateControllerService(service, proposedService); @@ -3069,7 +3106,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) { final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier()); if (funnel == null) { - final Funnel added = addFunnel(proposedFunnel, componentIdSeed); + final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) { updateFunnel(funnel, proposedFunnel); @@ -3090,7 +3127,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedPort proposedPort : proposed.getInputPorts()) { final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { - final Port added = addInputPort(proposedPort, componentIdSeed); + final Port added = addInputPort(group, proposedPort, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3110,7 +3147,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedPort proposedPort : proposed.getOutputPorts()) { final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { - final Port added = addOutputPort(proposedPort, componentIdSeed); + final Port added = addOutputPort(group, proposedPort, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3131,7 +3168,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedLabel proposedLabel : proposed.getLabels()) { final Label label = labelsByVersionedId.get(proposedLabel.getIdentifier()); if (label == null) { - final Label added = addLabel(proposedLabel, componentIdSeed); + final Label added = addLabel(group, proposedLabel, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) { updateLabel(label, proposedLabel); @@ -3153,19 +3190,21 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); if (processor == null) { - final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed); + final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed); - final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream() - .map(relName -> added.getRelationship(relName)) - .collect(Collectors.toSet()); + final Set<Relationship> proposedAutoTerminated = + proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream() + .map(relName -> added.getRelationship(relName)) + .collect(Collectors.toSet()); autoTerminatedRelationships.put(added, proposedAutoTerminated); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { updateProcessor(processor, proposedProcessor); - final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream() - .map(relName -> processor.getRelationship(relName)) - .collect(Collectors.toSet()); + final Set<Relationship> proposedAutoTerminated = + proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream() + .map(relName -> processor.getRelationship(relName)) + .collect(Collectors.toSet()); if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) { autoTerminatedRelationships.put(processor, proposedAutoTerminated); @@ -3188,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) { final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier()); if (rpg == null) { - final RemoteProcessGroup added = addRemoteProcessGroup(proposedRpg, componentIdSeed); + final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) { updateRemoteProcessGroup(rpg, proposedRpg); @@ -3209,7 +3248,7 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedConnection proposedConnection : proposed.getConnections()) { final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier()); if (connection == null) { - final Connection added = addConnection(proposedConnection, componentIdSeed); + final Connection added = addConnection(group, proposedConnection, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (!connection.getSource().isRunning() && !connection.getDestination().isRunning()) { // If the connection needs to be updated, then the source and destination will already have @@ -3306,20 +3345,22 @@ public final class StandardProcessGroup implements ProcessGroup { } - private ProcessGroup addProcessGroup(final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException { + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException { final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); - addProcessGroup(group); + group.setParent(destination); updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true); + destination.addProcessGroup(group); return group; } private void updateConnection(final Connection connection, final VersionedConnection proposed) { - connection.setBendPoints(proposed.getBends().stream() - .map(pos -> new Position(pos.getX(), pos.getY())) - .collect(Collectors.toList())); + connection.setBendPoints(proposed.getBends() == null ? Collections.emptyList() : + proposed.getBends().stream() + .map(pos -> new Position(pos.getX(), pos.getY())) + .collect(Collectors.toList())); - connection.setDestination(getConnectable(proposed.getDestination())); + connection.setDestination(getConnectable(connection.getProcessGroup(), proposed.getDestination())); connection.setLabelIndex(proposed.getLabelIndex()); connection.setName(proposed.getName()); connection.setRelationships(proposed.getSelectedRelationships().stream() @@ -3332,7 +3373,7 @@ public final class StandardProcessGroup implements ProcessGroup { queue.setBackPressureObjectThreshold(proposed.getBackPressureObjectThreshold()); queue.setFlowFileExpiration(proposed.getFlowFileExpiration()); - final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers().stream() + final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers() == null ? Collections.emptyList() : proposed.getPrioritizers().stream() .map(prioritizerName -> { try { return flowController.createPrioritizer(prioritizerName); @@ -3345,14 +3386,14 @@ public final class StandardProcessGroup implements ProcessGroup { queue.setPriorities(prioritizers); } - private Connection addConnection(final VersionedConnection proposed, final String componentIdSeed) { - final Connectable source = getConnectable(proposed.getSource()); + private Connection addConnection(final ProcessGroup destinationGroup, final VersionedConnection proposed, final String componentIdSeed) { + final Connectable source = getConnectable(destinationGroup, proposed.getSource()); if (source == null) { throw new IllegalArgumentException("Connection has a source with identifier " + proposed.getIdentifier() + " but no component could be found in the Process Group with a corresponding identifier"); } - final Connectable destination = getConnectable(proposed.getDestination()); + final Connectable destination = getConnectable(destinationGroup, proposed.getDestination()); if (destination == null) { throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier() + " but no component could be found in the Process Group with a corresponding identifier"); @@ -3360,43 +3401,65 @@ public final class StandardProcessGroup implements ProcessGroup { final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships()); connection.setVersionedComponentId(proposed.getIdentifier()); - addConnection(connection); + destinationGroup.addConnection(connection); updateConnection(connection, proposed); return connection; } - private Connectable getConnectable(final ConnectableComponent connectableComponent) { + private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent) { final String id = connectableComponent.getId(); switch (connectableComponent.getType()) { case FUNNEL: - return getFunnels().stream() + return group.getFunnels().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny() .orElse(null); - case INPUT_PORT: - return getInputPorts().stream() + case INPUT_PORT: { + final Optional<Port> port = group.getInputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (port.isPresent()) { + return port.get(); + } + + return group.getProcessGroups().stream() + .flatMap(gr -> gr.getInputPorts().stream()) .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny() .orElse(null); - case OUTPUT_PORT: - return getOutputPorts().stream() + } + case OUTPUT_PORT: { + final Optional<Port> port = group.getOutputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (port.isPresent()) { + return port.get(); + } + + return group.getProcessGroups().stream() + .flatMap(gr -> gr.getOutputPorts().stream()) .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny() .orElse(null); + } case PROCESSOR: - return getProcessors().stream() + return group.getProcessors().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny() .orElse(null); case REMOTE_INPUT_PORT: { final String rpgId = connectableComponent.getGroupId(); - final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream() + final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny(); @@ -3415,7 +3478,7 @@ public final class StandardProcessGroup implements ProcessGroup { } case REMOTE_OUTPUT_PORT: { final String rpgId = connectableComponent.getGroupId(); - final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream() + final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream() .filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> id.equals(component.getVersionedComponentId().get())) .findAny(); @@ -3471,7 +3534,7 @@ public final class StandardProcessGroup implements ProcessGroup { return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); } - private ControllerServiceNode addControllerService(final VersionedControllerService proposed, final String componentIdSeed) { + private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) { final String type = proposed.getType(); final String id = generateUuid(componentIdSeed); @@ -3483,7 +3546,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ControllerServiceNode newService = flowController.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded); newService.setVersionedComponentId(proposed.getIdentifier()); - addControllerService(newService); + destination.addControllerService(newService); updateControllerService(newService, proposed); return newService; @@ -3493,10 +3556,10 @@ public final class StandardProcessGroup implements ProcessGroup { funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); } - private Funnel addFunnel(final VersionedFunnel proposed, final String componentIdSeed) { + private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) { final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed)); funnel.setVersionedComponentId(proposed.getIdentifier()); - addFunnel(funnel); + destination.addFunnel(funnel); updateFunnel(funnel, proposed); return funnel; @@ -3508,28 +3571,28 @@ public final class StandardProcessGroup implements ProcessGroup { port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); } - private Port addInputPort(final VersionedPort proposed, final String componentIdSeed) { + private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); port.setVersionedComponentId(proposed.getIdentifier()); - addInputPort(port); + destination.addInputPort(port); updatePort(port, proposed); return port; } - private Port addOutputPort(final VersionedPort proposed, final String componentIdSeed) { - final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); + private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { + final Port port = flowController.createLocalOutputPort(generateUuid(componentIdSeed), proposed.getName()); port.setVersionedComponentId(proposed.getIdentifier()); - addOutputPort(port); + destination.addOutputPort(port); updatePort(port, proposed); return port; } - private Label addLabel(final VersionedLabel proposed, final String componentIdSeed) { + private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) { final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel()); label.setVersionedComponentId(proposed.getIdentifier()); - addLabel(label); + destination.addLabel(label); updateLabel(label, proposed); return label; @@ -3542,12 +3605,12 @@ public final class StandardProcessGroup implements ProcessGroup { label.setValue(proposed.getLabel()); } - private ProcessorNode addProcessor(final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException { + private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException { final BundleCoordinate coordinate = toCoordinate(proposed.getBundle()); final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true); procNode.setVersionedComponentId(proposed.getIdentifier()); - addProcessor(procNode); + destination.addProcessor(procNode); updateProcessor(procNode, proposed); return procNode; @@ -3584,15 +3647,18 @@ public final class StandardProcessGroup implements ProcessGroup { fullPropertyMap.put(property.getName(), null); } - fullPropertyMap.putAll(proposedProperties); + if (proposedProperties != null) { + fullPropertyMap.putAll(proposedProperties); + } + return fullPropertyMap; } - private RemoteProcessGroup addRemoteProcessGroup(final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { + private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris()); rpg.setVersionedComponentId(proposed.getIdentifier()); - addRemoteProcessGroup(rpg); + destination.addRemoteProcessGroup(rpg); updateRemoteProcessGroup(rpg, proposed); return rpg; @@ -3601,12 +3667,12 @@ public final class StandardProcessGroup implements ProcessGroup { private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) { rpg.setComments(proposed.getComments()); rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); - rpg.setInputPorts(proposed.getInputPorts().stream() + rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream() .map(port -> createPortDescriptor(port)) .collect(Collectors.toSet())); rpg.setName(proposed.getName()); rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); - rpg.setOutputPorts(proposed.getOutputPorts().stream() + rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream() .map(port -> createPortDescriptor(port)) .collect(Collectors.toSet())); rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); @@ -3633,7 +3699,7 @@ public final class StandardProcessGroup implements ProcessGroup { } - public Optional<Boolean> isModified() { + private Set<FlowDifference> getModifications() { final StandardVersionControlInformation vci = versionControlInfo.get(); // If this group is not under version control, then we need to notify the parent @@ -3645,17 +3711,17 @@ public final class StandardProcessGroup implements ProcessGroup { // say that the flow is modified. There would be no way to ever go back to the flow not being modified. // So we have to perform a diff of the flows and see if they are the same. if (vci == null) { - return Optional.of(Boolean.FALSE); + return null; } if (vci.getFlowSnapshot() == null) { // we haven't retrieved the flow from the Flow Registry yet, so we don't know if it's been modified. // As a result, we will just return an empty optional - return Optional.empty(); + return null; } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient()); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); final ComparableDataFlow currentFlow = new ComparableDataFlow() { @Override @@ -3681,17 +3747,16 @@ public final class StandardProcessGroup implements ProcessGroup { } }; - final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow); + final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow, new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set<FlowDifference> differences = comparison.getDifferences(); - final boolean modified = differences.stream() + final Set<FlowDifference> functionalDifferences = differences.stream() .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) .filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED) - .findAny() - .isPresent(); + .collect(Collectors.toSet()); - LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences); - return Optional.of(modified); + LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences); + return functionalDifferences; } @@ -3713,10 +3778,21 @@ public final class StandardProcessGroup implements ProcessGroup { + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later"); } + final Set<FlowDifference> modifications = getModifications(); + if (Boolean.TRUE.equals(modifiedOption.get())) { + final String changes = modifications.stream() + .map(FlowDifference::toString) + .collect(Collectors.joining("\n")); + + LOG.error("Cannot change the Version of the flow for {} because the Process Group has been modified ({} modifications) " + + "since it was last synchronized with the Flow Registry. The following differences were found:\n{}", + this, modifications.size(), changes); + throw new IllegalStateException("Cannot change the Version of the flow for " + this - + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be" - + " restored to its original form before changing the version"); + + " because the Process Group has been modified (" + modifications.size() + + " modifications) since it was last synchronized with the Flow Registry. The Process Group must be" + + " reverted to its original form before changing the version. See logs for more information on what has changed."); } } } @@ -3851,11 +3927,13 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get())); for (final VersionedConnection connectionToAdd : proposedConnections.values()) { - for (final String prioritizerType : connectionToAdd.getPrioritizers()) { - try { - flowController.createPrioritizer(prioritizerType); - } catch (Exception e) { - throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e); + if (connectionToAdd.getPrioritizers() != null) { + for (final String prioritizerType : connectionToAdd.getPrioritizers()) { + try { + flowController.createPrioritizer(prioritizerType); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e); + } } } }
