NIFI-4436: - Initial checkpoint: able ot start version control and detect changes, in standalone mode, still 'crude' implementation - Checkpoint: Can place flow under version control and can determine if modified - Checkpoint: Change version working in some cases. Does not work if processor removed because COMPONENT_REMOVED type has ComponentA whose ID is the VersionedComponentID but we are trying to call ProcessorDAO.get() with this ID - Checkpoint: Able to change flow from Version 1 to Version 2 and back. Not yet tested with controller services. Have not tried changing/removing connections. Not cluster-friendly yet. All inline, not in background. Have not taken into account ports, funnels, remote ports, etc. Have not tested with Labels yet - Checkpoint after implementing ClusterReplicationComponentLifecycle instead of JerseyClientComponentLifecycle - Checkpoint: Updated to allow starting version control and updating version in clustered mode - Checkpoint: Updated versioning endpoint so that when version of a flow is updated, the bundle information is populated and the snapshot is replicated to the cluster. - Checkpoint: Implemented endpoint for reverting to previously sync'ed version of a flow and updated version control endpoint so that Process Group can be pushed as a new version to existing flow instead of only creating a new flow - Checkpoint: Updated so that if a Process Group is under Version Control and it has a child Process Group, which is also under Version Control, we can handle that gracefully. Not yet tested because it depends on updates to the nifi-registry module, which can't be compiled due to maven dependency conflicts
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6a58d780 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6a58d780 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6a58d780 Branch: refs/heads/master Commit: 6a58d780d7041eb2b29d4da7a4c60c525d63f66d Parents: 8d4fe38 Author: Mark Payne <[email protected]> Authored: Thu Aug 10 10:02:35 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:52 2018 -0500 ---------------------------------------------------------------------- .../nifi/components/VersionedComponent.java | 39 + .../nifi-framework/nifi-client-dto/pom.xml | 4 + .../nifi/web/api/dto/AffectedComponentDTO.java | 31 +- .../apache/nifi/web/api/dto/ComponentDTO.java | 10 + .../apache/nifi/web/api/dto/ConnectableDTO.java | 11 + .../nifi/web/api/dto/ProcessGroupDTO.java | 12 +- .../web/api/dto/RemoteProcessGroupPortDTO.java | 10 + .../api/dto/VersionControlInformationDTO.java | 98 ++ .../nifi/web/api/dto/VersionedFlowDTO.java | 76 + .../api/dto/VersionedFlowUpdateRequestDTO.java | 90 ++ .../web/api/entity/AffectedComponentEntity.java | 4 +- .../VersionControlComponentMappingEntity.java | 59 + .../entity/VersionControlInformationEntity.java | 48 + .../web/api/entity/VersionedFlowEntity.java | 48 + .../api/entity/VersionedFlowSnapshotEntity.java | 58 + .../VersionedFlowUpdateRequestEntity.java | 49 + .../endpoints/ControllerEndpointMerger.java | 2 - .../FlowConfigurationEndpointMerger.java | 1 - ...VersionControlInformationEndpointMerger.java | 56 + .../http/replication/RequestReplicator.java | 77 +- .../ThreadPoolRequestReplicator.java | 29 +- .../cluster/manager/ConnectionEntityMerger.java | 1 - .../nifi/cluster/manager/PortEntityMerger.java | 1 - .../manager/ProcessGroupEntityMerger.java | 17 + .../cluster/manager/ProcessorEntityMerger.java | 1 - .../VersionControlInformationEntityMerger.java | 48 + ...FileBasedClusterNodeFirewallFactoryBean.java | 6 +- .../TestThreadPoolRequestReplicator.java | 7 +- .../apache/nifi/cluster/integration/Node.java | 3 +- .../nifi-framework-core-api/pom.xml | 4 + .../apache/nifi/connectable/Connectable.java | 4 +- .../org/apache/nifi/connectable/Connection.java | 3 +- .../apache/nifi/controller/AbstractPort.java | 25 + .../apache/nifi/controller/StandardFunnel.java | 25 + .../org/apache/nifi/controller/label/Label.java | 4 +- .../service/ControllerServiceNode.java | 3 +- .../org/apache/nifi/groups/ProcessGroup.java | 57 +- .../apache/nifi/groups/RemoteProcessGroup.java | 3 +- .../RemoteProcessGroupPortDescriptor.java | 5 + .../apache/nifi/registry/flow/FlowRegistry.java | 93 ++ .../nifi/registry/flow/FlowRegistryClient.java | 37 + .../registry/flow/UnknownResourceException.java | 33 + .../flow/VersionControlInformation.java | 69 + .../nifi-framework/nifi-framework-core/pom.xml | 8 + .../nifi/connectable/StandardConnection.java | 25 + .../apache/nifi/controller/FlowController.java | 39 +- .../controller/StandardFlowSynchronizer.java | 26 + .../nifi/controller/StandardProcessorNode.java | 24 + .../nifi/controller/label/StandardLabel.java | 32 + .../serialization/FlowFromDOMFactory.java | 26 + .../serialization/StandardFlowSerializer.java | 33 + .../service/ControllerServiceLoader.java | 1 + .../service/StandardControllerServiceNode.java | 24 + .../nifi/fingerprint/FingerprintFactory.java | 21 +- .../nifi/groups/StandardProcessGroup.java | 1181 ++++++++++++++- .../flow/FileBasedFlowRegistryClient.java | 404 +++++ .../flow/StandardVersionControlInformation.java | 89 ++ .../InstantiatedConnectableComponent.java | 40 + .../mapping/InstantiatedVersionedComponent.java | 24 + .../InstantiatedVersionedConnection.java | 40 + .../InstantiatedVersionedControllerService.java | 40 + .../mapping/InstantiatedVersionedFunnel.java | 40 + .../mapping/InstantiatedVersionedLabel.java | 40 + .../flow/mapping/InstantiatedVersionedPort.java | 40 + .../InstantiatedVersionedProcessGroup.java | 40 + .../mapping/InstantiatedVersionedProcessor.java | 40 + .../InstantiatedVersionedRemoteGroupPort.java | 40 + ...InstantiatedVersionedRemoteProcessGroup.java | 40 + .../flow/mapping/NiFiRegistryDtoMapper.java | 327 ++++ .../flow/mapping/NiFiRegistryFlowMapper.java | 397 +++++ .../nifi/remote/StandardRemoteProcessGroup.java | 24 + ...tandardRemoteProcessGroupPortDescriptor.java | 10 + .../nifi/spring/FlowControllerFactoryBean.java | 13 +- .../src/main/resources/FlowConfiguration.xsd | 21 +- .../src/main/resources/nifi-context.xml | 6 + .../controller/StandardFlowServiceTest.java | 3 +- .../nifi/controller/TestFlowController.java | 7 +- .../reporting/TestStandardReportingContext.java | 5 +- .../scheduling/TestProcessorLifecycle.java | 4 +- .../StandardFlowSerializerTest.java | 4 +- .../service/mock/MockProcessGroup.java | 36 + .../org/apache/nifi/web/NiFiServiceFacade.java | 187 ++- .../apache/nifi/web/NiFiServiceFacadeLock.java | 6 + .../nifi/web/StandardNiFiServiceFacade.java | 536 ++++++- .../nifi/web/api/ApplicationResource.java | 80 +- .../org/apache/nifi/web/api/FlowResource.java | 18 + .../nifi/web/api/ProcessGroupResource.java | 4 +- .../apache/nifi/web/api/VersionsResource.java | 1409 ++++++++++++++++++ .../web/api/concurrent/AsyncRequestManager.java | 162 ++ .../api/concurrent/AsynchronousWebRequest.java | 80 + .../nifi/web/api/concurrent/RequestManager.java | 69 + .../StandardAsynchronousWebRequest.java | 93 ++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 244 ++- .../apache/nifi/web/api/dto/EntityFactory.java | 8 + .../nifi/web/controller/ControllerFacade.java | 6 + .../apache/nifi/web/dao/ProcessGroupDAO.java | 24 + .../web/dao/impl/StandardProcessGroupDAO.java | 117 +- .../nifi/web/util/AffectedComponentUtils.java | 71 + .../nifi/web/util/CancellableTimedPause.java | 59 + .../ClusterReplicationComponentLifecycle.java | 445 ++++++ .../nifi/web/util/ComponentLifecycle.java | 62 + .../web/util/LifecycleManagementException.java | 34 + .../nifi/web/util/LocalComponentLifecycle.java | 312 ++++ .../src/main/resources/nifi-web-api-context.xml | 26 + .../nifi/web/revision/NaiveRevisionManager.java | 3 +- pom.xml | 11 + 106 files changed, 8319 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java b/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java new file mode 100644 index 0000000..164a4f2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components; + +import java.util.Optional; + +public interface VersionedComponent { + + /** + * @return the unique identifier that maps this component to a component that is versioned + * in a Flow Registry, or <code>Optional.empty</code> if this component has not been saved to a Flow Registry. + */ + Optional<String> getVersionedComponentId(); + + /** + * Updates the versioned component identifier + * + * @param versionedComponentId the identifier of the versioned component + * + * @throws IllegalStateException if this component is already under version control with a different ID and + * the given ID is not null + */ + void setVersionedComponentId(String versionedComponentId); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml index b47b2bc..69bb4a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml @@ -26,5 +26,9 @@ <groupId>io.swagger</groupId> <artifactId>swagger-annotations</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-data-model</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java index 3567b53..95024ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java @@ -26,6 +26,8 @@ import java.util.Collection; public class AffectedComponentDTO { public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR"; public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE"; + public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT"; + public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT"; private String processGroupId; private String id; @@ -54,7 +56,9 @@ public class AffectedComponentDTO { this.id = id; } - @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE) + @ApiModelProperty(value = "The type of this component", + allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", " + + COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT) public String getReferenceType() { return referenceType; } @@ -73,21 +77,6 @@ public class AffectedComponentDTO { } /** - * @return scheduled state of the processor referencing a controller service. If this component is another service, this field represents the controller service state - */ - @ApiModelProperty( - value = "The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller " - + "service, this field represents the controller service state." - ) - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - /** * @return active thread count for the referencing component */ @ApiModelProperty( @@ -114,4 +103,14 @@ public class AffectedComponentDTO { public void setValidationErrors(Collection<String> validationErrors) { this.validationErrors = validationErrors; } + + @ApiModelProperty("The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller " + + "service, this field represents the controller service state.") + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java index 2feefd7..81915ee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java @@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType; public class ComponentDTO { private String id; + private String versionedComponentId; private String parentGroupId; private PositionDTO position; @@ -47,6 +48,15 @@ public class ComponentDTO { this.id = id; } + @ApiModelProperty("The ID of the corresponding component that is under version control") + public String getVersionedComponentId() { + return versionedComponentId; + } + + public void setVersionedComponentId(final String id) { + this.versionedComponentId = id; + } + /** * @return id for the parent group of this component if applicable, null otherwise */ http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java index b820479..a63872e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java @@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType; public class ConnectableDTO { private String id; + private String versionedComponentId; private String type; private String groupId; private String name; @@ -50,6 +51,16 @@ public class ConnectableDTO { this.id = id; } + @ApiModelProperty("The ID of the corresponding component that is under version control") + public String getVersionedComponentId() { + return versionedComponentId; + } + + public void setVersionedComponentId(final String id) { + this.versionedComponentId = id; + } + + /** * @return type of this connectable component */ http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java index c8e4a39..7faf10b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java @@ -30,6 +30,7 @@ public class ProcessGroupDTO extends ComponentDTO { private String name; private String comments; private Map<String, String> variables; + private VersionControlInformationDTO versionControlInfo; private Integer runningCount; private Integer stoppedCount; @@ -203,7 +204,6 @@ public class ProcessGroupDTO extends ComponentDTO { this.inactiveRemotePortCount = inactiveRemotePortCount; } - @ApiModelProperty(value = "The variables that are configured for the Process Group. Note that this map contains only " + "those variables that are defined on this Process Group and not any variables that are defined in the parent " + "Process Group, etc. I.e., this Map will not contain all variables that are accessible by components in this " @@ -215,4 +215,14 @@ public class ProcessGroupDTO extends ComponentDTO { public void setVariables(final Map<String, String> variables) { this.variables = variables; } + + @ApiModelProperty("The Version Control information that indicates which Flow Registry, and where in the Flow Registry, " + + "this Process Group is tracking to; or null if this Process Group is not under version control") + public VersionControlInformationDTO getVersionControlInformation() { + return versionControlInfo; + } + + public void setVersionControlInformation(final VersionControlInformationDTO versionControlInfo) { + this.versionControlInfo = versionControlInfo; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index 59c5631..8b0ddb4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -27,6 +27,7 @@ public class RemoteProcessGroupPortDTO { private String id; private String targetId; + private String versionedComponentId; private String groupId; private String name; private String comments; @@ -52,6 +53,15 @@ public class RemoteProcessGroupPortDTO { this.comments = comments; } + @ApiModelProperty("The ID of the corresponding component that is under version control") + public String getVersionedComponentId() { + return versionedComponentId; + } + + public void setVersionedComponentId(final String id) { + this.versionedComponentId = id; + } + /** * @return number tasks that may transmit flow files to the target port concurrently */ http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java new file mode 100644 index 0000000..d27e830 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "versionControlInformation") +public class VersionControlInformationDTO { + private String groupId; + private String registryId; + private String bucketId; + private String flowId; + private Integer version; + private Boolean modified; + private Boolean current; + + @ApiModelProperty("The ID of the Process Group that is under version control") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @ApiModelProperty("The ID of the registry that the flow is stored in") + public String getRegistryId() { + return registryId; + } + + public void setRegistryId(final String registryId) { + this.registryId = registryId; + } + + @ApiModelProperty("The ID of the bucket that the flow is stored in") + public String getBucketId() { + return bucketId; + } + + public void setBucketId(final String bucketId) { + this.bucketId = bucketId; + } + + @ApiModelProperty("The ID of the flow") + public String getFlowId() { + return flowId; + } + + public void setFlowId(final String flowId) { + this.flowId = flowId; + } + + @ApiModelProperty("The version of the flow") + public Integer getVersion() { + return version; + } + + public void setVersion(final Integer version) { + this.version = version; + } + + @ApiModelProperty(readOnly=true, + value = "Whether or not the flow has been modified since it was last synced to the Flow Registry. The value will be null if this information is not yet known.") + public Boolean getModified() { + return modified; + } + + public void setModified(Boolean modified) { + this.modified = modified; + } + + @ApiModelProperty(readOnly=true, + value = "Whether or not this is the most recent version of the flow in the Flow Registry. The value will be null if this information is not yet known.") + public Boolean getCurrent() { + return current; + } + + public void setCurrent(Boolean current) { + this.current = current; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java new file mode 100644 index 0000000..27a83e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "versionedFlow") +public class VersionedFlowDTO { + private String registryId = "default"; // placeholder for now. + private String bucketId; + private String flowId; + private String flowName; + private String description; + + @ApiModelProperty("The ID of the registry that the flow is tracked to") + public String getRegistryId() { + return registryId; + } + + public void setRegistryId(String registryId) { + this.registryId = registryId; + } + + @ApiModelProperty("The ID of the bucket where the flow is stored") + public String getBucketId() { + return bucketId; + } + + public void setBucketId(String bucketId) { + this.bucketId = bucketId; + } + + @ApiModelProperty(value = "The ID of the flow") + public String getFlowId() { + return flowId; + } + + public void setFlowId(String flowId) { + this.flowId = flowId; + } + + @ApiModelProperty("The name of the flow") + public String getFlowName() { + return flowName; + } + + public void setFlowName(String flowName) { + this.flowName = flowName; + } + + @ApiModelProperty("A description of the flow") + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java new file mode 100644 index 0000000..aa42bf6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.Date; + +@XmlType(name = "versionedFlowUpdateRequest") +public class VersionedFlowUpdateRequestDTO { + private String requestId; + private String processGroupId; + private String uri; + private Date lastUpdated; + private boolean complete = false; + private String failureReason; + + @ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to") + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(String processGroupId) { + this.processGroupId = processGroupId; + } + + @ApiModelProperty(value = "The unique ID of this request.", readOnly = true) + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + @ApiModelProperty(value = "The URI for future requests to this drop request.", readOnly = true) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty(value = "The last time this request was updated.", dataType = "string", readOnly = true) + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + @ApiModelProperty(value = "Whether or not this request has completed", readOnly = true) + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + @ApiModelProperty(value = "An explanation of why this request failed, or null if this request has not failed", readOnly = true) + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String reason) { + this.failureReason = reason; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java index 0f28f73..e0d8496 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.web.api.entity; import org.apache.nifi.web.api.dto.AffectedComponentDTO; @@ -33,12 +32,13 @@ public class AffectedComponentEntity extends ComponentEntity implements Permissi /** * @return variable referencing components that is being serialized */ + @Override public AffectedComponentDTO getComponent() { return component; } + @Override public void setComponent(AffectedComponentDTO component) { this.component = component; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java new file mode 100644 index 0000000..e1bd6b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Map; + +@XmlRootElement(name = "versionControlComponentMappingEntity") +public class VersionControlComponentMappingEntity extends Entity { + private VersionControlInformationDTO versionControlDto; + private Map<String, String> versionControlComponentMapping; + private RevisionDTO processGroupRevision; + + @ApiModelProperty("The Version Control information") + public VersionControlInformationDTO getVersionControlInformation() { + return versionControlDto; + } + + public void setVersionControlInformation(VersionControlInformationDTO versionControlDto) { + this.versionControlDto = versionControlDto; + } + + @ApiModelProperty("The mapping of Versioned Component Identifiers to instance ID's") + public Map<String, String> getVersionControlComponentMapping() { + return versionControlComponentMapping; + } + + public void setVersionControlComponentMapping(Map<String, String> mapping) { + this.versionControlComponentMapping = mapping; + } + + @ApiModelProperty("The revision of the Process Group") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(RevisionDTO processGroupRevision) { + this.processGroupRevision = processGroupRevision; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java new file mode 100644 index 0000000..e8ec81f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "versionControlInformationEntity") +public class VersionControlInformationEntity extends Entity { + private VersionControlInformationDTO versionControlDto; + private RevisionDTO processGroupRevision; + + @ApiModelProperty("The Version Control information") + public VersionControlInformationDTO getVersionControlInformation() { + return versionControlDto; + } + + public void setVersionControlInformation(VersionControlInformationDTO versionControlDto) { + this.versionControlDto = versionControlDto; + } + + @ApiModelProperty("The Revision for the Process Group") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(RevisionDTO revision) { + this.processGroupRevision = revision; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java new file mode 100644 index 0000000..b94255a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionedFlowDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "versionedFlow") +public class VersionedFlowEntity extends Entity { + private VersionedFlowDTO versionedFlow; + private RevisionDTO processGroupRevision; + + @ApiModelProperty("The versioned flow") + public VersionedFlowDTO getVersionedFlow() { + return versionedFlow; + } + + public void setVersionedFlow(VersionedFlowDTO versionedFLow) { + this.versionedFlow = versionedFLow; + } + + @ApiModelProperty("The Revision of the Process Group under Version Control") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(final RevisionDTO revision) { + this.processGroupRevision = revision; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java new file mode 100644 index 0000000..170640d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.web.api.dto.RevisionDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "versionedFlowSnapshot") +public class VersionedFlowSnapshotEntity extends Entity { + private VersionedFlowSnapshot versionedFlowSnapshot; + private RevisionDTO processGroupRevision; + private String registryId; + + @ApiModelProperty("The versioned flow snapshot") + public VersionedFlowSnapshot getVersionedFlowSnapshot() { + return versionedFlowSnapshot; + } + + public void setVersionedFlow(VersionedFlowSnapshot versionedFlowSnapshot) { + this.versionedFlowSnapshot = versionedFlowSnapshot; + } + + @ApiModelProperty("The Revision of the Process Group under Version Control") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(final RevisionDTO revision) { + this.processGroupRevision = revision; + } + + @ApiModelProperty("The ID of the Registry that this flow belongs to") + public String getRegistryId() { + return registryId; + } + + public void setRegistryId(String registryId) { + this.registryId = registryId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java new file mode 100644 index 0000000..7211824 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "versionedFlowUpdateRequestEntity") +public class VersionedFlowUpdateRequestEntity extends Entity { + private VersionedFlowUpdateRequestDTO request; + private RevisionDTO processGroupRevision; + + @ApiModelProperty("The revision for the Process Group that owns this variable registry.") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(RevisionDTO revision) { + this.processGroupRevision = revision; + } + + @ApiModelProperty("The Versioned Flow Update Request") + public VersionedFlowUpdateRequestDTO getRequest() { + return request; + } + + public void setRequest(VersionedFlowUpdateRequestDTO request) { + this.request = request; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java index 6e38860..804d59d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java @@ -33,7 +33,6 @@ import java.util.regex.Pattern; public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<ControllerEntity, ControllerDTO> { public static final Pattern CONTROLLER_URI_PATTERN = Pattern.compile("/nifi-api/site-to-site"); - private PortEntityMerger portMerger = new PortEntityMerger(); @Override protected Class<ControllerEntity> getEntityClass() { @@ -47,7 +46,6 @@ public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<Controll @Override protected void mergeResponses(ControllerDTO clientDto, Map<NodeIdentifier, ControllerDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - ControllerDTO mergedController = clientDto; final Map<String, Map<NodeIdentifier, PortDTO>> inputPortMap = new HashMap<>(); // map of port id to map of node id to port dto final Map<String, Map<NodeIdentifier, PortDTO>> outputPortMap = new HashMap<>(); // map of port id to map of node id to port dto http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java index 22fa684..6ba2859 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java @@ -47,7 +47,6 @@ public class FlowConfigurationEndpointMerger extends AbstractNodeStatusEndpoint< protected void mergeResponses(FlowConfigurationDTO clientDto, Map<NodeIdentifier, FlowConfigurationDTO> dtoMap, NodeIdentifier selectedNodeId) { for (final Map.Entry<NodeIdentifier, FlowConfigurationDTO> entry : dtoMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final FlowConfigurationDTO toMerge = entry.getValue(); if (toMerge != clientDto) { clientDto.setSupportsConfigurableAuthorizer(clientDto.getSupportsConfigurableAuthorizer() && toMerge.getSupportsConfigurableAuthorizer()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java new file mode 100644 index 0000000..14e3bf6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.VersionControlInformationEntityMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; + +public class VersionControlInformationEndpointMerger extends AbstractSingleEntityEndpoint<VersionControlInformationEntity> implements EndpointResponseMerger { + public static final Pattern VERSION_CONTROL_URI_PATTERN = Pattern.compile("/nifi-api/versions/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); + private final VersionControlInformationEntityMerger versionControlInfoEntityMerger = new VersionControlInformationEntityMerger(); + + @Override + public boolean canHandle(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(method)) && (VERSION_CONTROL_URI_PATTERN.matcher(uri.getPath()).matches())) { + return true; + } + + return false; + } + + @Override + protected Class<VersionControlInformationEntity> getEntityClass() { + return VersionControlInformationEntity.class; + } + + @Override + protected void mergeResponses(final VersionControlInformationEntity clientEntity, final Map<NodeIdentifier, VersionControlInformationEntity> entityMap, + final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) { + + versionControlInfoEntityMerger.merge(clientEntity, entityMap); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index 1fd6a49..a7177d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Map; import java.util.Set; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.cluster.protocol.NodeIdentifier; public interface RequestReplicator { @@ -71,7 +72,6 @@ public interface RequestReplicator { */ void shutdown(); - /** * Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node * that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object @@ -89,24 +89,64 @@ public interface RequestReplicator { AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers); /** + * Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node + * that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object + * will contain the results that are immediately available, as well as an identifier for obtaining an updated result + * later. NOTE: This method will ALWAYS indicate that the request has been replicated. + * + * @param user the user making the request + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later + * @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state + * @throws DisconnectedNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the DISCONNECTED state + */ + AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers); + + /** + * Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain + * the results that are immediately available, as well as an identifier for obtaining an updated result later. + * <p> + * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used. + * + * @param nodeIds the node identifiers + * @param user the user making the request + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request + * has already been replicated, so the receiving node will not replicate the request itself. + * @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before + * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. + * If false, will perform no such verification + * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later + */ + AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, + boolean performVerification); + + /** * Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain * the results that are immediately available, as well as an identifier for obtaining an updated result later. * <p> * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used. * - * @param nodeIds the node identifiers - * @param method the HTTP method (e.g., POST, PUT) - * @param uri the base request URI (up to, but not including, the query string) - * @param entity an entity - * @param headers any HTTP headers - * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request - * has already been replicated, so the receiving node will not replicate the request itself. + * @param nodeIds the node identifiers + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request + * has already been replicated, so the receiving node will not replicate the request itself. * @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before - * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. - * If false, will perform no such verification + * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. + * If false, will perform no such verification * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ - AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification); + AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, + boolean performVerification); /** @@ -122,6 +162,19 @@ public interface RequestReplicator { AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers); /** + * Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster. + * + * @param coordinatorNodeId the node identifier of the Cluster Coordinator + * @param user the user making the request + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later + */ + AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers); + + /** * <p> * Returns an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier. * If the request is finished, meaning that all nodes in the cluster have reported back their status or have timed out, @@ -132,7 +185,7 @@ public interface RequestReplicator { * * @param requestIdentifier the identifier of the request to obtain a response for * @return an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier, or <code>null</code> if - * no request exists with the given identifier + * no request exists with the given identifier */ AsyncClusterResponse getClusterResponse(String requestIdentifier); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index bd7729c..bd1e4b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -195,8 +195,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { maintenanceExecutor.shutdown(); } + @Override public AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers) { + return replicate(NiFiUserUtils.getNiFiUser(), method, uri, entity, headers); + } + + @Override + public AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) { final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates(); final boolean mutable = isMutableRequest(method, uri.getPath()); @@ -237,11 +243,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds); - return replicate(nodeIdSet, method, uri, entity, headers, true, true); + return replicate(nodeIdSet, user, method, uri, entity, headers, true, true); } - void updateRequestHeaders(final Map<String, String> headers) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); + void updateRequestHeaders(final Map<String, String> headers, final NiFiUser user) { if (user == null) { throw new AccessDeniedException("Unknown user"); } @@ -279,6 +284,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { @Override public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, + final boolean indicateReplicated, final boolean performVerification) { + + return replicate(nodeIds, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers, indicateReplicated, performVerification); + } + + @Override + public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, final NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated, final boolean performVerification) { final Map<String, String> updatedHeaders = new HashMap<>(headers); @@ -288,7 +300,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } // include the proxied entities header - updateRequestHeaders(updatedHeaders); + updateRequestHeaders(updatedHeaders, user); if (indicateReplicated) { // If we are replicating a request and indicating that it is replicated, then this means that we are @@ -324,12 +336,19 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } } + @Override public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers) { + return forwardToCoordinator(coordinatorNodeId, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers); + } + + @Override + public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final NiFiUser user, final String method, + final URI uri, final Object entity, final Map<String, String> headers) { final Map<String, String> updatedHeaders = new HashMap<>(headers); // include the proxied entities header - updateRequestHeaders(updatedHeaders); + updateRequestHeaders(updatedHeaders, user); return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java index 89ac179..7e3bc5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java @@ -28,7 +28,6 @@ public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionE public void merge(ConnectionEntity clientEntity, Map<NodeIdentifier, ConnectionEntity> entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); for (Map.Entry<NodeIdentifier, ConnectionEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final ConnectionEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java index 2929741..3df3c16 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java @@ -31,7 +31,6 @@ public class PortEntityMerger implements ComponentEntityMerger<PortEntity>, Comp public void merge(PortEntity clientEntity, Map<NodeIdentifier, PortEntity> entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); for (Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final PortEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java index 67278a7..457e75b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java @@ -17,6 +17,8 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -31,6 +33,7 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr final ProcessGroupEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); + mergeVersionControlInformation(clientEntity, entityStatus); } } } @@ -41,4 +44,18 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), statusNodeIdentifier.getApiPort()); } + + private void mergeVersionControlInformation(ProcessGroupEntity targetGroup, ProcessGroupEntity toMerge) { + final ProcessGroupDTO targetGroupDto = targetGroup.getComponent(); + final ProcessGroupDTO toMergeGroupDto = toMerge.getComponent(); + + final VersionControlInformationDTO targetVersionControl = targetGroupDto.getVersionControlInformation(); + final VersionControlInformationDTO toMergeVersionControl = toMergeGroupDto.getVersionControlInformation(); + + if (targetVersionControl == null) { + targetGroupDto.setVersionControlInformation(toMergeGroupDto.getVersionControlInformation()); + } else if (toMergeVersionControl != null) { + targetVersionControl.setCurrent(Boolean.TRUE.equals(targetVersionControl.getCurrent()) && Boolean.TRUE.equals(toMergeVersionControl.getCurrent())); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java index 5c419e9..dffac49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java @@ -32,7 +32,6 @@ public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEnt public void merge(ProcessorEntity clientEntity, Map<NodeIdentifier, ProcessorEntity> entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); for (Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final ProcessorEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java new file mode 100644 index 0000000..8d102df --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.manager; + +import java.util.Map; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; + +public class VersionControlInformationEntityMerger { + + public void merge(final VersionControlInformationEntity clientEntity, final Map<NodeIdentifier, VersionControlInformationEntity> entityMap) { + + final VersionControlInformationDTO clientDto = clientEntity.getVersionControlInformation(); + + // We need to merge the 'current' and 'modified' flags because these are updated by nodes in the background. Since + // the nodes can synchronize with the Flow Registry at different intervals, we have to determine how to handle these + // flags if different nodes report different values for them. + entityMap.values().stream() + .filter(entity -> entity != clientEntity) + .forEach(entity -> { + final VersionControlInformationDTO dto = entity.getVersionControlInformation(); + + // We consider the flow to be current only if ALL nodes indicate that it is current + clientDto.setCurrent(Boolean.TRUE.equals(clientDto.getCurrent()) && Boolean.TRUE.equals(dto.getCurrent())); + + // We consider the flow to be modified if ANY node indicates that it is modified + clientDto.setModified(Boolean.TRUE.equals(clientDto.getModified()) || Boolean.TRUE.equals(dto.getModified())); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java index a86fc79..3e76de6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java @@ -25,14 +25,14 @@ import org.springframework.beans.factory.FactoryBean; /** * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance. */ -public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean { +public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean<FileBasedClusterNodeFirewall> { private FileBasedClusterNodeFirewall firewall; private NiFiProperties properties; @Override - public Object getObject() throws Exception { + public FileBasedClusterNodeFirewall getObject() throws Exception { if (firewall == null) { final File config = properties.getClusterNodeFirewallFile(); final File restoreDirectory = properties.getRestoreDirectory(); @@ -44,7 +44,7 @@ public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean { } @Override - public Class getObjectType() { + public Class<FileBasedClusterNodeFirewall> getObjectType() { return FileBasedClusterNodeFirewall.class; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 836751c..1f0ceb5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.replication; import org.apache.commons.collections4.map.MultiValueMap; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser.Builder; import org.apache.nifi.cluster.coordination.ClusterCoordinator; @@ -443,7 +444,7 @@ public class TestThreadPoolRequestReplicator { // ensure the proxied entities header is set final Map<String, String> updatedHeaders = new HashMap<>(); - replicator.updateRequestHeaders(updatedHeaders); + replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser()); // Pass in Collections.emptySet() for the node ID's so that an Exception is thrown replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(), @@ -501,7 +502,7 @@ public class TestThreadPoolRequestReplicator { // ensure the proxied entities header is set final Map<String, String> updatedHeaders = new HashMap<>(); - replicator.updateRequestHeaders(updatedHeaders); + replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser()); replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor); @@ -554,7 +555,7 @@ public class TestThreadPoolRequestReplicator { // ensure the proxied entities header is set final Map<String, String> updatedHeaders = new HashMap<>(); - replicator.updateRequestHeaders(updatedHeaders); + replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser()); replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index cace715..44d4905 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -66,6 +66,7 @@ import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.NiFiProperties; @@ -147,7 +148,7 @@ public class Node { final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor(); flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties, null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, - heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY); + heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class)); try { flowController.initializeFlow(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml index 0cf5906..d1bce36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml @@ -58,5 +58,9 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-framework-authorization</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-data-model</artifactId> + </dependency> </dependencies> </project>
