NIFI-1678: - Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination - Added configuration options for ZooKeeper username & password for heartbeat management. Also addressed issue where nodes that were previously disconnected were asked to disconnect upon restart - Ensure that ACL is set properly when creating heartbeat node. Removed unused ControllerStartupFailureMessage.java - Changed ZooKeeper ACL's so that container nodes that would not be sensitive are wide open and removed the usage of username & password when communicating with ZooKeeper. This was done specifically because username/password combination is considered a 'testing' feature that should not be used in production and is not supported by Apache Curator - Refactored CuratorHeartbeatMonitor into an abstract heartbeat monitor that is responsible for processing heartbeats and CuratorHeartbeatMonitor that is responsible for retrieving heartbeat information - Refactored so that heartbeats are sent to Cluster Coordinator directly instead of to ZooKeeper. ZooKeeper is used to know which node is the cluster coordinator but heartbeats to the Cluster Coordinator provide additional information about the nodes. - Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination - Added configuration options for ZooKeeper username & password for heartbeat management. Also addressed issue where nodes that were previously disconnected were asked to disconnect upon restart - Changed ZooKeeper ACL's so that container nodes that would not be sensitive are wide open and removed the usage of username & password when communicating with ZooKeeper. This was done specifically because username/password combination is considered a 'testing' feature that should not be used in production and is not supported by Apache Curator
NIFI-1727: - Refactored logic for merging HTTP Requests that are federated across cluster NIFI-1745: - Refactoring how HTTP Requests are replicated to nodes - Bug fixes and continuing to work on replication refactoring. Still need to handle cluster locking and revisions - Begin work on RevisionManager - Resolved some issues that resulted from rebase - Fixed URIs to align with new URI's that will be used in 1.0.0 - This closes #413 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/04c41c06 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/04c41c06 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/04c41c06 Branch: refs/heads/master Commit: 04c41c0654b166c9f42844cc9907ea791f4563bc Parents: 3db14f5 Author: Mark Payne <[email protected]> Authored: Thu Mar 24 11:49:08 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Fri May 6 15:23:12 2016 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/controller/Snippet.java | 20 +- .../controller/status/ProcessGroupStatus.java | 1 + .../history/ComponentStatusRepository.java | 29 - .../main/java/org/apache/nifi/web/Revision.java | 47 +- nifi-assembly/pom.xml | 1 - .../org/apache/nifi/util/NiFiProperties.java | 1 - .../nifi/web/api/dto/ControllerServiceDTO.java | 12 +- .../org/apache/nifi/web/api/dto/SnippetDTO.java | 55 +- .../web/api/dto/flow/ProcessGroupFlowDTO.java | 2 +- ...rollerServiceReferencingComponentEntity.java | 43 + ...ollerServiceReferencingComponentsEntity.java | 10 +- .../entity/RemoteProcessGroupPortEntity.java | 3 +- .../web/api/entity/ReportingTaskEntity.java | 3 +- .../nifi/web/api/entity/SnippetEntity.java | 2 +- .../protocol/impl/NodeProtocolSenderImpl.java | 1 + ...ackage: org.apache.nifi.cluster.coordination | 62 + ...ava~eded0de154e2fcb543eda68e510bee06f0779f10 | 62 + .../protocol/message/ProtocolMessage.java | 2 +- .../coordination/ClusterCoordinator.java | 9 + .../heartbeat/CuratorHeartbeatMonitor.java | 376 +++ .../http/EndpointResponseMerger.java | 62 + .../coordination/http/HttpResponseMerger.java | 65 + .../http/StandardHttpResponseMerger.java | 243 ++ .../endpoints/AbstractMultiEntityEndpoint.java | 99 + .../endpoints/AbstractNodeStatusEndpoint.java | 40 + .../endpoints/AbstractSingleEntityEndpoint.java | 115 + .../endpoints/BulletinBoardEndpointMerger.java | 82 + .../endpoints/ComponentStateEndpointMerger.java | 97 + .../ConnectionStatusEndpiontMerger.java | 74 + .../ControllerServiceEndpointMerger.java | 146 + ...ontrollerServiceReferenceEndpointMerger.java | 68 + .../ControllerServicesEndpointMerger.java | 56 + .../ControllerStatusEndpointMerger.java | 83 + .../http/endpoints/CountersEndpointMerger.java | 73 + .../endpoints/DropRequestEndpiontMerger.java | 125 + .../endpoints/FlowSnippetEndpointMerger.java | 103 + .../endpoints/GroupStatusEndpointMerger.java | 89 + .../endpoints/ListFlowFilesEndpointMerger.java | 156 ++ .../endpoints/PortStatusEndpointMerger.java | 75 + .../endpoints/ProcessGroupEndpointMerger.java | 107 + .../http/endpoints/ProcessorEndpointMerger.java | 75 + .../ProcessorStatusEndpointMerger.java | 74 + .../endpoints/ProcessorsEndpointMerger.java | 78 + .../ProvenanceEventEndpointMerger.java | 55 + .../ProvenanceQueryEndpointMerger.java | 187 ++ .../RemoteProcessGroupEndpointMerger.java | 116 + .../RemoteProcessGroupStatusEndpointMerger.java | 74 + .../RemoteProcessGroupsEndpointMerger.java | 78 + .../endpoints/ReportingTaskEndpointMerger.java | 80 + .../endpoints/ReportingTasksEndpointMerger.java | 56 + .../endpoints/StatusHistoryEndpointMerger.java | 222 ++ .../SystemDiagnosticsEndpointMerger.java | 73 + .../http/replication/AsyncClusterResponse.java | 122 + .../http/replication/CompletionCallback.java | 22 + .../replication/RequestCompletionCallback.java | 39 + .../http/replication/RequestReplicator.java | 74 + .../http/replication/ResponseUtils.java | 88 + .../StandardAsyncClusterResponse.java | 263 ++ .../ThreadPoolRequestReplicator.java | 672 +++++ .../nifi/cluster/manager/StatusMerger.java | 6 - .../manager/impl/HttpRequestReplicatorImpl.java | 3 +- .../cluster/manager/impl/WebClusterManager.java | 2493 ++---------------- .../impl/WebClusterManagerCoordinator.java | 20 +- .../spring/WebClusterManagerFactoryBean.java | 6 - .../resources/nifi-cluster-manager-context.xml | 32 - .../heartbeat/TestAbstractHeartbeatMonitor.java | 5 + .../heartbeat/TestCuratorHeartbeatMonitor.java | 355 +++ .../endpoints/TestProcessorEndpointMerger.java | 64 + .../TestStatusHistoryEndpointMerger.java | 51 + .../http/replication/TestResponseUtils.java | 126 + .../TestThreadPoolRequestReplicator.java | 296 +++ .../impl/HttpRequestReplicatorImplTest.java | 2 + .../manager/impl/TestWebClusterManager.java | 52 - .../src/test/resources/conf/nifi.properties | 127 + .../service/ControllerServiceProvider.java | 11 +- .../apache/nifi/controller/FlowController.java | 35 +- .../nifi/controller/StandardFlowService.java | 14 + .../apache/nifi/controller/StandardSnippet.java | 88 +- .../cluster/ClusterProtocolHeartbeater.java | 118 + .../cluster/ZooKeeperClientConfig.java | 25 +- .../cluster/ZooKeeperHeartbeater.java | 117 - .../StandardControllerServiceProvider.java | 34 +- .../history/ConnectionStatusDescriptor.java | 42 +- .../history/ProcessGroupStatusDescriptor.java | 132 +- .../history/ProcessorStatusDescriptor.java | 87 +- .../RemoteProcessGroupStatusDescriptor.java | 93 +- .../VolatileComponentStatusRepository.java | 55 - .../nifi/groups/StandardProcessGroup.java | 95 +- .../nifi/persistence/FlowConfigurationDAO.java | 17 + .../StandardXMLFlowConfigurationDAO.java | 32 +- .../org/apache/nifi/services/FlowService.java | 6 + .../src/main/resources/conf/nifi.properties | 1 - .../org/apache/nifi/audit/SnippetAuditor.java | 28 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 54 +- .../nifi/web/StandardNiFiServiceFacade.java | 1285 +++++---- .../StandardNiFiWebConfigurationContext.java | 12 +- .../nifi/web/api/ApplicationResource.java | 99 +- .../apache/nifi/web/api/ConnectionResource.java | 81 +- .../apache/nifi/web/api/ControllerResource.java | 39 +- .../nifi/web/api/ControllerServiceResource.java | 155 +- .../nifi/web/api/FlowFileQueueResource.java | 1 + .../org/apache/nifi/web/api/FlowResource.java | 4 + .../org/apache/nifi/web/api/FunnelResource.java | 79 +- .../apache/nifi/web/api/InputPortResource.java | 71 +- .../org/apache/nifi/web/api/LabelResource.java | 75 +- .../apache/nifi/web/api/OutputPortResource.java | 75 +- .../nifi/web/api/ProcessGroupResource.java | 160 +- .../apache/nifi/web/api/ProcessorResource.java | 45 +- .../web/api/RemoteProcessGroupResource.java | 115 +- .../nifi/web/api/ReportingTaskResource.java | 65 +- .../apache/nifi/web/api/TemplateResource.java | 5 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 127 +- .../apache/nifi/web/api/dto/EntityFactory.java | 61 + .../nifi/web/controller/ControllerFacade.java | 6 +- .../nifi/web/dao/ControllerServiceDAO.java | 7 +- .../dao/impl/StandardControllerServiceDAO.java | 16 +- .../nifi/web/dao/impl/StandardSnippetDAO.java | 30 +- .../org/apache/nifi/web/util/SnippetUtils.java | 16 +- .../src/main/resources/nifi-web-api-context.xml | 8 +- .../org/apache/nifi/web/FlowModification.java | 4 + .../nifi/web/OptimisticLockingManager.java | 4 + .../web/StandardOptimisticLockingManager.java | 3 + .../nifi/web/revision/DeleteRevisionTask.java | 24 + .../revision/ExpiredRevisionClaimException.java | 28 + .../LockVerificationFailedException.java | 26 + .../nifi/web/revision/NaiveRevisionManager.java | 589 +++++ .../web/revision/ReadOnlyRevisionCallback.java | 34 + .../apache/nifi/web/revision/RevisionClaim.java | 26 + .../nifi/web/revision/RevisionComparator.java | 41 + .../nifi/web/revision/RevisionLockResult.java | 48 + .../nifi/web/revision/RevisionManager.java | 181 ++ .../nifi/web/revision/RevisionUpdate.java | 43 + .../web/revision/StandardRevisionClaim.java | 49 + .../web/revision/StandardRevisionUpdate.java | 66 + .../nifi/web/revision/UpdateRevisionTask.java | 64 + .../web/revision/TestNaiveRevisionManager.java | 379 +++ .../src/main/webapp/js/nf/canvas/nf-settings.js | 6 +- 137 files changed, 10032 insertions(+), 4089 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java b/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java index 93f3327..5fad824 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.controller; -import java.util.Set; +import java.util.Map; + +import org.apache.nifi.web.Revision; /** * A Snippet represents a segment of the flow @@ -44,42 +46,42 @@ public interface Snippet { /** * @return connections in this snippet */ - public Set<String> getConnections(); + public Map<String, Revision> getConnections(); /** * @return funnels in this snippet */ - public Set<String> getFunnels(); + public Map<String, Revision> getFunnels(); /** * @return input ports in this snippet */ - public Set<String> getInputPorts(); + public Map<String, Revision> getInputPorts(); /** * @return output ports in this snippet */ - public Set<String> getOutputPorts(); + public Map<String, Revision> getOutputPorts(); /** * @return labels in this snippet */ - public Set<String> getLabels(); + public Map<String, Revision> getLabels(); /** * @return the identifiers of all ProcessGroups in this Snippet */ - public Set<String> getProcessGroups(); + public Map<String, Revision> getProcessGroups(); /** * @return the identifiers of all Processors in this Snippet */ - public Set<String> getProcessors(); + public Map<String, Revision> getProcessors(); /** * @return the identifiers of all RemoteProcessGroups in this Snippet */ - public Set<String> getRemoteProcessGroups(); + public Map<String, Revision> getRemoteProcessGroups(); /** * @return Determines if this snippet is empty http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java index db16954..f60bda8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java @@ -445,6 +445,7 @@ public class ProcessGroupStatus implements Cloneable { merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); merged.setProcessingNanos(merged.getProcessingNanos() + statusToMerge.getProcessingNanos()); + merged.setFlowFilesRemoved(merged.getFlowFilesRemoved() + statusToMerge.getFlowFilesRemoved()); // if the status to merge is invalid allow it to take precedence. whether the // processor run status is disabled/stopped/running is part of the flow configuration http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java index 4628a28..d273096 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java @@ -17,12 +17,8 @@ package org.apache.nifi.controller.status.history; import java.util.Date; -import java.util.List; -import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; /** * A repository for storing and retrieving components' historical status @@ -121,29 +117,4 @@ public interface ComponentStatusRepository { * period */ StatusHistory getRemoteProcessGroupStatusHistory(String remoteGroupId, Date start, Date end, int preferredDataPoints); - - /** - * @return a List of all {@link MetricDescriptor}s that are applicable to - * Process Groups - */ - List<MetricDescriptor<ProcessGroupStatus>> getProcessGroupMetricDescriptors(); - - /** - * @return a List of all {@link MetricDescriptor}s that are applicable to - * Processors - */ - List<MetricDescriptor<ProcessorStatus>> getProcessorMetricDescriptors(); - - /** - * @return a List of all {@link MetricDescriptor}s that are applicable to - * Remote Process Groups - */ - List<MetricDescriptor<RemoteProcessGroupStatus>> getRemoteProcessGroupMetricDescriptors(); - - /** - * @return a List of all {@link MetricDescriptor}s that are applicable to - * Connections - */ - List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors(); - } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-api/src/main/java/org/apache/nifi/web/Revision.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java index a4d36d2..0533307 100644 --- a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java +++ b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java @@ -17,10 +17,11 @@ package org.apache.nifi.web; import java.io.Serializable; +import java.util.Objects; /** - * A model object representing a revision. Equality is defined as either a - * matching version number or matching non-empty client IDs. + * A model object representing a revision. Equality is defined as matching + * component ID and either a matching version number or matching non-empty client IDs. * * @Immutable * @Threadsafe @@ -37,9 +38,22 @@ public class Revision implements Serializable { */ private final String clientId; + /** + * the ID of the component that this revision belongs to, or <code>null</code> if + * the revision is not attached to any component but rather is attached to the entire + * data flow. + */ + private final String componentId; + + @Deprecated public Revision(Long revision, String clientId) { + this(revision, clientId, "root"); // TODO: remove this constructor. This is to bridge the gap right now + } + + public Revision(Long revision, String clientId, String componentId) { this.version = revision; this.clientId = clientId; + this.componentId = Objects.requireNonNull(componentId); } public String getClientId() { @@ -50,14 +64,40 @@ public class Revision implements Serializable { return version; } + public String getComponentId() { + return componentId; + } + @Override public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } if ((obj instanceof Revision) == false) { return false; } + // TODO: THIS IS FOR TESTING PURPOSES! DO NOT LET THIS GET CHECKED IN THIS WAY!!!!!!!!!!!! + if (true) { + return true; + } + Revision thatRevision = (Revision) obj; + // ensure that component ID's are the same (including null) + if (thatRevision.getComponentId() == null && getComponentId() != null) { + return false; + } + if (thatRevision.getComponentId() != null && getComponentId() == null) { + return false; + } + if (thatRevision.getComponentId() != null && !thatRevision.getComponentId().equals(getComponentId())) { + return false; + } + if (this.version != null && this.version.equals(thatRevision.version)) { return true; } else { @@ -69,6 +109,7 @@ public class Revision implements Serializable { @Override public int hashCode() { int hash = 5; + hash = 59 * hash + (this.componentId != null ? this.componentId.hashCode() : 0); hash = 59 * hash + (this.version != null ? this.version.hashCode() : 0); hash = 59 * hash + (this.clientId != null ? this.clientId.hashCode() : 0); return hash; @@ -76,6 +117,6 @@ public class Revision implements Serializable { @Override public String toString() { - return "[" + version + ", " + clientId + ']'; + return "[" + version + ", " + clientId + ", " + componentId + ']'; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index a09d0af..4d3f92f 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -462,7 +462,6 @@ language governing permissions and limitations under the License. --> <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout> <nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout> <nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node> - <nifi.zookeeper.access.control>Open</nifi.zookeeper.access.control> <!-- nifi.properties: kerberos properties --> <nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file> http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index a6387ad..63693bf 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -172,7 +172,6 @@ public class NiFiProperties extends Properties { public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout"; public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; - public static final String ZOOKEEPER_ACCESS_CONTROL = "nifi.zookeeper.access.control"; // cluster manager properties public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager"; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java index f40e181..913ffa0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java @@ -16,12 +16,16 @@ */ package org.apache.nifi.web.api.dto; -import com.wordnik.swagger.annotations.ApiModelProperty; import java.util.Collection; import java.util.Map; import java.util.Set; + import javax.xml.bind.annotation.XmlType; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; + +import com.wordnik.swagger.annotations.ApiModelProperty; + /** * A Controller Service that can be shared by other components */ @@ -41,7 +45,7 @@ public class ControllerServiceDTO extends ComponentDTO { private String customUiUrl; private String annotationData; - private Set<ControllerServiceReferencingComponentDTO> referencingComponents; + private Set<ControllerServiceReferencingComponentEntity> referencingComponents; private Collection<String> validationErrors; @@ -193,11 +197,11 @@ public class ControllerServiceDTO extends ComponentDTO { @ApiModelProperty( value = "All components referencing this controller service." ) - public Set<ControllerServiceReferencingComponentDTO> getReferencingComponents() { + public Set<ControllerServiceReferencingComponentEntity> getReferencingComponents() { return referencingComponents; } - public void setReferencingComponents(Set<ControllerServiceReferencingComponentDTO> referencingComponents) { + public void setReferencingComponents(Set<ControllerServiceReferencingComponentEntity> referencingComponents) { this.referencingComponents = referencingComponents; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SnippetDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SnippetDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SnippetDTO.java index bf5d319..8362c18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SnippetDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SnippetDTO.java @@ -16,11 +16,12 @@ */ package org.apache.nifi.web.api.dto; -import com.wordnik.swagger.annotations.ApiModelProperty; +import java.util.HashMap; +import java.util.Map; import javax.xml.bind.annotation.XmlType; -import java.util.HashSet; -import java.util.Set; + +import com.wordnik.swagger.annotations.ApiModelProperty; /** * The contents of a snippet of a flow. @@ -34,14 +35,14 @@ public class SnippetDTO { private Boolean linked; // when specified these are only considered during creation - private Set<String> processGroups = new HashSet<>(); - private Set<String> remoteProcessGroups = new HashSet<>(); - private Set<String> processors = new HashSet<>(); - private Set<String> inputPorts = new HashSet<>(); - private Set<String> outputPorts = new HashSet<>(); - private Set<String> connections = new HashSet<>(); - private Set<String> labels = new HashSet<>(); - private Set<String> funnels = new HashSet<>(); + private Map<String, RevisionDTO> processGroups = new HashMap<>(); + private Map<String, RevisionDTO> remoteProcessGroups = new HashMap<>(); + private Map<String, RevisionDTO> processors = new HashMap<>(); + private Map<String, RevisionDTO> inputPorts = new HashMap<>(); + private Map<String, RevisionDTO> outputPorts = new HashMap<>(); + private Map<String, RevisionDTO> connections = new HashMap<>(); + private Map<String, RevisionDTO> labels = new HashMap<>(); + private Map<String, RevisionDTO> funnels = new HashMap<>(); /** * @return id of this snippet @@ -109,11 +110,11 @@ public class SnippetDTO { value = "The ids of the connections in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getConnections() { + public Map<String, RevisionDTO> getConnections() { return connections; } - public void setConnections(Set<String> connections) { + public void setConnections(Map<String, RevisionDTO> connections) { this.connections = connections; } @@ -125,11 +126,11 @@ public class SnippetDTO { value = "The ids of the funnels in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getFunnels() { + public Map<String, RevisionDTO> getFunnels() { return funnels; } - public void setFunnels(Set<String> funnels) { + public void setFunnels(Map<String, RevisionDTO> funnels) { this.funnels = funnels; } @@ -141,11 +142,11 @@ public class SnippetDTO { value = "The ids of the input ports in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getInputPorts() { + public Map<String, RevisionDTO> getInputPorts() { return inputPorts; } - public void setInputPorts(Set<String> inputPorts) { + public void setInputPorts(Map<String, RevisionDTO> inputPorts) { this.inputPorts = inputPorts; } @@ -157,11 +158,11 @@ public class SnippetDTO { value = "The ids of the labels in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getLabels() { + public Map<String, RevisionDTO> getLabels() { return labels; } - public void setLabels(Set<String> labels) { + public void setLabels(Map<String, RevisionDTO> labels) { this.labels = labels; } @@ -173,11 +174,11 @@ public class SnippetDTO { value = "The ids of the output ports in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getOutputPorts() { + public Map<String, RevisionDTO> getOutputPorts() { return outputPorts; } - public void setOutputPorts(Set<String> outputPorts) { + public void setOutputPorts(Map<String, RevisionDTO> outputPorts) { this.outputPorts = outputPorts; } @@ -189,11 +190,11 @@ public class SnippetDTO { value = "The ids of the process groups in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getProcessGroups() { + public Map<String, RevisionDTO> getProcessGroups() { return processGroups; } - public void setProcessGroups(Set<String> processGroups) { + public void setProcessGroups(Map<String, RevisionDTO> processGroups) { this.processGroups = processGroups; } @@ -205,11 +206,11 @@ public class SnippetDTO { value = "The ids of the processors in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getProcessors() { + public Map<String, RevisionDTO> getProcessors() { return processors; } - public void setProcessors(Set<String> processors) { + public void setProcessors(Map<String, RevisionDTO> processors) { this.processors = processors; } @@ -221,11 +222,11 @@ public class SnippetDTO { value = "The ids of the remote process groups in this snippet. These ids will be populated within each response. They can be specified when creating a snippet. However, once a snippet " + "has been created its contents cannot be modified (these ids are ignored during update requests)." ) - public Set<String> getRemoteProcessGroups() { + public Map<String, RevisionDTO> getRemoteProcessGroups() { return remoteProcessGroups; } - public void setRemoteProcessGroups(Set<String> remoteProcessGroups) { + public void setRemoteProcessGroups(Map<String, RevisionDTO> remoteProcessGroups) { this.remoteProcessGroups = remoteProcessGroups; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java index b651efc..f0f56f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/flow/ProcessGroupFlowDTO.java @@ -23,7 +23,7 @@ import javax.xml.bind.annotation.XmlType; /** * The NiFi flow starting at a given Process Group. */ -@XmlType(name = "processGroupflow") +@XmlType(name = "processGroupFlow") public class ProcessGroupFlowDTO { private String id; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentEntity.java new file mode 100644 index 0000000..dccac25 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentEntity.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; + +/** + * A serialized representation of this class can be placed in the entity body of a response to the API. + * This particular entity holds a reference to component that references a controller services. + */ +@XmlRootElement(name = "controllerServiceReferencingComponentEntity") +public class ControllerServiceReferencingComponentEntity extends ComponentEntity { + private ControllerServiceReferencingComponentDTO controllerServiceReferencingComponent; + + /** + * @return controller service referencing components that is being serialized + */ + public ControllerServiceReferencingComponentDTO getControllerServiceReferencingComponent() { + return controllerServiceReferencingComponent; + } + + public void setControllerServiceReferencingComponent(ControllerServiceReferencingComponentDTO controllerServiceReferencingComponent) { + this.controllerServiceReferencingComponent = controllerServiceReferencingComponent; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentsEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentsEntity.java index c6be79d..8c69bff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentsEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ControllerServiceReferencingComponentsEntity.java @@ -17,8 +17,8 @@ package org.apache.nifi.web.api.entity; import java.util.Set; + import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; /** * A serialized representation of this class can be placed in the entity body of a response to the API. This particular entity holds a reference to a list of controller services referencing @@ -27,16 +27,16 @@ import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; @XmlRootElement(name = "controllerServiceReferencingComponentsEntity") public class ControllerServiceReferencingComponentsEntity extends Entity { - private Set<ControllerServiceReferencingComponentDTO> controllerServiceReferencingComponents; + private Set<ControllerServiceReferencingComponentEntity> controllerServiceReferencingComponents; /** - * @return list of controller service referencing components that are being serialized + * @return set of controller service referencing components that are being serialized */ - public Set<ControllerServiceReferencingComponentDTO> getControllerServiceReferencingComponents() { + public Set<ControllerServiceReferencingComponentEntity> getControllerServiceReferencingComponents() { return controllerServiceReferencingComponents; } - public void setControllerServiceReferencingComponents(Set<ControllerServiceReferencingComponentDTO> controllerServiceReferencingComponents) { + public void setControllerServiceReferencingComponents(Set<ControllerServiceReferencingComponentEntity> controllerServiceReferencingComponents) { this.controllerServiceReferencingComponents = controllerServiceReferencingComponents; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupPortEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupPortEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupPortEntity.java index 9d5189a..6216f95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupPortEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupPortEntity.java @@ -17,13 +17,14 @@ package org.apache.nifi.web.api.entity; import javax.xml.bind.annotation.XmlRootElement; + import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a RemoteProcessGroupPortDTO. */ @XmlRootElement(name = "remoteProcessGroupPortEntity") -public class RemoteProcessGroupPortEntity extends Entity { +public class RemoteProcessGroupPortEntity extends ComponentEntity { private RemoteProcessGroupPortDTO remoteProcessGroupPort; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskEntity.java index 3c07dd7..ff6f9c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskEntity.java @@ -17,13 +17,14 @@ package org.apache.nifi.web.api.entity; import javax.xml.bind.annotation.XmlRootElement; + import org.apache.nifi.web.api.dto.ReportingTaskDTO; /** * A serialized representation of this class can be placed in the entity body of a response to the API. This particular entity holds a reference to a reporting task. */ @XmlRootElement(name = "reportingTaskEntity") -public class ReportingTaskEntity extends Entity { +public class ReportingTaskEntity extends ComponentEntity { private ReportingTaskDTO reportingTask; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/SnippetEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/SnippetEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/SnippetEntity.java index d20e61d..205dc17 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/SnippetEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/SnippetEntity.java @@ -23,7 +23,7 @@ import org.apache.nifi.web.api.dto.SnippetDTO; * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a SnippetDTO. */ @XmlRootElement(name = "snippetEntity") -public class SnippetEntity extends Entity { +public class SnippetEntity extends ComponentEntity { private SnippetDTO snippet; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java index 10a58cf..23b1209 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java @@ -94,6 +94,7 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender { } } + @Override public void heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { final String hostname; final int port; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination new file mode 100644 index 0000000..7a99d0e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * Message to indicate that the status of a node in the cluster has changed + */ +@XmlRootElement(name = "nodeStatusChange") +public class NodeStatusChangeMessage extends ProtocolMessage { + private NodeConnectionStatus connectionStatus; + private NodeIdentifier nodeId; + private Long statusUpdateId = -1L; + + @Override + public MessageType getType() { + return MessageType.NODE_STATUS_CHANGE; + } + + public void setNodeConnectionStatus(final NodeConnectionStatus status) { + this.connectionStatus = status; + } + + public NodeConnectionStatus getNodeConnectionStatus() { + return connectionStatus; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + public Long getStatusUpdateIdentifier() { + return statusUpdateId; + } + + public void setStatusUpdateIdentifier(Long statusUpdateId) { + this.statusUpdateId = statusUpdateId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~eded0de154e2fcb543eda68e510bee06f0779f10 ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~eded0de154e2fcb543eda68e510bee06f0779f10 b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~eded0de154e2fcb543eda68e510bee06f0779f10 new file mode 100644 index 0000000..7a99d0e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java~eded0de154e2fcb543eda68e510bee06f0779f10 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * Message to indicate that the status of a node in the cluster has changed + */ +@XmlRootElement(name = "nodeStatusChange") +public class NodeStatusChangeMessage extends ProtocolMessage { + private NodeConnectionStatus connectionStatus; + private NodeIdentifier nodeId; + private Long statusUpdateId = -1L; + + @Override + public MessageType getType() { + return MessageType.NODE_STATUS_CHANGE; + } + + public void setNodeConnectionStatus(final NodeConnectionStatus status) { + this.connectionStatus = status; + } + + public NodeConnectionStatus getNodeConnectionStatus() { + return connectionStatus; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + public Long getStatusUpdateIdentifier() { + return statusUpdateId; + } + + public void setStatusUpdateIdentifier(Long statusUpdateId) { + this.statusUpdateId = statusUpdateId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index 27be95f..28cef5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -27,11 +27,11 @@ public abstract class ProtocolMessage { EXCEPTION, FLOW_REQUEST, FLOW_RESPONSE, - HEARTBEAT, PING, RECONNECTION_REQUEST, RECONNECTION_RESPONSE, SERVICE_BROADCAST, + HEARTBEAT, NODE_STATUS_CHANGE; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 59ded24..fa49a62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -17,6 +17,8 @@ package org.apache.nifi.cluster.coordination; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; @@ -88,6 +90,13 @@ public interface ClusterCoordinator { Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state); /** + * Returns a Map of NodeConnectionStatus to all Node Identifiers that have that status. + * + * @return the NodeConnectionStatus for each Node in the cluster, grouped by the Connection Status + */ + Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates(); + + /** * Checks if the given hostname is blocked by the configured firewall, returning * <code>true</code> if the node is blocked, <code>false</code> if the node is * allowed through the firewall or if there is no firewall configured http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java new file mode 100644 index 0000000..8114813 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { + private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); + private static final Unmarshaller unmarshaller; + + private final ClusterCoordinator clusterCoordinator; + private final ZooKeeperClientConfig zkClientConfig; + private final String heartbeatPath; + private final int heartbeatIntervalMillis; + + private volatile CuratorFramework curatorClient; + private volatile ScheduledFuture<?> future; + private volatile Map<NodeIdentifier, NodeHeartbeat> latestHeartbeatMessages; + private volatile long latestHeartbeatTime; + + private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + + static { + try { + final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); + unmarshaller = jaxbContext.createUnmarshaller(); + } catch (final Exception e) { + throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); + } + } + + public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { + this.clusterCoordinator = clusterCoordinator; + this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); + this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + + final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + + this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); + } + + @Override + public void start() { + final RetryPolicy retryPolicy = new RetryForever(5000); + curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), + zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); + curatorClient.start(); + + this.future = flowEngine.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + monitorHeartbeats(); + } catch (final Exception e) { + clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString()); + logger.error("Failed to process heartbeats", e); + } + } + }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS); + } + + private CuratorFramework getClient() { + return curatorClient; + } + + @Override + public void stop() { + final CuratorFramework client = getClient(); + if (client != null) { + client.close(); + } + + if (future != null) { + future.cancel(true); + } + } + + @Override + public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) { + return latestHeartbeatMessages.get(nodeId); + } + + + /** + * Fetches all of the latest heartbeats from ZooKeeper + * and updates the Cluster Coordinator as appropriate, + * based on the heartbeats received. + * + * Visible for testing. + */ + synchronized void monitorHeartbeats() { + final StopWatch fetchStopWatch = new StopWatch(true); + final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = fetchHeartbeats(); + if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { + // failed to fetch heartbeats from ZooKeeper; don't change anything. + clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to retrieve any new heartbeat information for nodes from ZooKeeper. " + + "Will not make any decisions based on heartbeats."); + return; + } + + this.latestHeartbeatMessages = new HashMap<>(latestHeartbeats); + fetchStopWatch.stop(); + + final StopWatch procStopWatch = new StopWatch(true); + for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) { + try { + processHeartbeat(heartbeat); + } catch (final Exception e) { + clusterCoordinator.reportEvent(null, Severity.ERROR, + "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); + logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); + logger.error("", e); + } + } + + procStopWatch.stop(); + logger.info("Finished processing {} heartbeats in {} (fetch took an additional {})", + latestHeartbeats.size(), procStopWatch.getDuration(), fetchStopWatch.getDuration()); + + // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval) + for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) { + final long maxMillis = heartbeatIntervalMillis * 1000L * 8; + final long threshold = latestHeartbeatTime - maxMillis; + + if (heartbeat.getTimestamp() < threshold) { + final int differenceSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(latestHeartbeatTime - heartbeat.getTimestamp()); + + clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, + "Node has not sent a heartbeat to ZooKeeper in " + differenceSeconds + " seconds"); + + try { + removeHeartbeat(heartbeat.getNodeIdentifier()); + } catch (final Exception e) { + logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); + logger.warn("", e); + } + } + } + } + + + private void processHeartbeat(final NodeHeartbeat heartbeat) { + final NodeIdentifier nodeId = heartbeat.getNodeIdentifier(); + + // Do not process heartbeat if it's blocked by firewall. + if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) { + clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request."); + + // request node to disconnect + clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall"); + removeHeartbeat(nodeId); + return; + } + + final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeId); + if (connectionStatus == null) { + final NodeConnectionState hbConnectionState = heartbeat.getConnectionStatus().getState(); + if (hbConnectionState == NodeConnectionState.DISCONNECTED || hbConnectionState == NodeConnectionState.DISCONNECTING) { + // Node is not part of the cluster. Remove heartbeat and move on. + removeHeartbeat(nodeId); + return; + } + + // Unknown node. Issue reconnect request + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node. Removing heartbeat and requesting that node connect to cluster."); + removeHeartbeat(nodeId); + + clusterCoordinator.requestNodeConnect(nodeId); + return; + } + + final DisconnectionCode reportedDisconnectCode = heartbeat.getConnectionStatus().getDisconnectCode(); + if (reportedDisconnectCode != null) { + // Check if the node is notifying us that it wants to disconnect from the cluster + final boolean requestingDisconnect; + switch (reportedDisconnectCode) { + case MISMATCHED_FLOWS: + case NODE_SHUTDOWN: + case STARTUP_FAILURE: + final NodeConnectionState expectedState = connectionStatus.getState(); + requestingDisconnect = expectedState == NodeConnectionState.CONNECTED || expectedState == NodeConnectionState.CONNECTING; + break; + default: + requestingDisconnect = false; + break; + } + + if (requestingDisconnect) { + clusterCoordinator.disconnectionRequestedByNode(nodeId, heartbeat.getConnectionStatus().getDisconnectCode(), + heartbeat.getConnectionStatus().getDisconnectReason()); + removeHeartbeat(nodeId); + return; + } + } + + final NodeConnectionState connectionState = connectionStatus.getState(); + if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) { + // Cluster Coordinator believes that node is connected, but node does not believe so. + clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster," + + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() + + "). Marking as Disconnected and requesting that Node reconnect to cluster"); + clusterCoordinator.requestNodeConnect(nodeId); + return; + } + + if (NodeConnectionState.DISCONNECTED == connectionState) { + // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is + // the only node. We allow it if it is the only node because if we have a one-node cluster, then + // we cannot manually reconnect it. + final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode(); + + if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) { + // record event + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " + + "disconnected due to lack of heartbeat. Issuing reconnection request."); + + clusterCoordinator.requestNodeConnect(nodeId); + } else { + // disconnected nodes should not heartbeat, so we need to issue a disconnection request + logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); + clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason()); + removeHeartbeat(nodeId); + } + + return; + } + + if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) { + // ignore spurious heartbeat + removeHeartbeat(nodeId); + return; + } + + // first heartbeat causes status change from connecting to connected + if (NodeConnectionState.CONNECTING == connectionState) { + final Long connectionRequestTime = connectionStatus.getConnectionRequestTime(); + if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) { + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect."); + removeHeartbeat(nodeId); + return; + } + + // connection complete + clusterCoordinator.finishNodeConnection(nodeId); + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected."); + } + + if (heartbeat.isPrimary()) { + clusterCoordinator.setPrimaryNode(nodeId); + } + } + + + /** + * Fetches the latest heartbeats for each node from ZooKeeper. + * Visible for testing + */ + Map<NodeIdentifier, NodeHeartbeat> fetchHeartbeats() { + logger.debug("Fetching heartbeats from ZooKeeper"); + final List<String> nodeIds; + try { + nodeIds = curatorClient.getChildren().forPath(heartbeatPath); + } catch (final NoNodeException nne) { + logger.info("Could not find any heartbeats in ZooKeeper because the ZNode " + heartbeatPath + " does not exist"); + return null; + } catch (final Exception e) { + logger.error("Failed to obtain heartbeats from ZooKeeper due to {}", e); + logger.error("", e); + clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to obtain heartbeats from ZooKeeper due to " + e); + + return null; + } + + logger.debug("Found {} nodes that have emitted heartbeats to ZooKeeper", nodeIds.size()); + final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = new HashMap<>(nodeIds.size()); + for (final String nodeId : nodeIds) { + final HeartbeatMessage heartbeatMsg; + + final String nodeHeartbeatPath = heartbeatPath + "/" + nodeId; + final Stat stat = new Stat(); + try { + final byte[] serializedHeartbeatMsg = getClient().getData().storingStatIn(stat).forPath(nodeHeartbeatPath); + heartbeatMsg = (HeartbeatMessage) unmarshaller.unmarshal(new ByteArrayInputStream(serializedHeartbeatMsg)); + } catch (final Exception e) { + logger.error("Failed to obtain heartbeat from ZooKeeper for Node with ID {} due to {}", nodeId, e); + logger.error("", e); + clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to obtain heartbeat for Node with ID " + nodeId + " from ZooKeeper due to " + e); + + continue; + } + + // update timestamp to be the timestamp that ZooKeeper reports + final long lastModifiedTime = stat.getMtime(); + if (lastModifiedTime > latestHeartbeatTime) { + latestHeartbeatTime = lastModifiedTime; + } + + latestHeartbeats.put(heartbeatMsg.getHeartbeat().getNodeIdentifier(), StandardNodeHeartbeat.fromHeartbeatMessage(heartbeatMsg, lastModifiedTime)); + logger.debug("Received heartbeat from Node {}", nodeId); + } + + logger.debug("Fetched {} heartbeats from ZooKeeper", latestHeartbeats.size()); + return latestHeartbeats; + } + + @Override + public synchronized void removeHeartbeat(final NodeIdentifier nodeId) { + logger.debug("Deleting heartbeat for node {}", nodeId); + final String nodeHeartbeatPath = heartbeatPath + "/" + nodeId.getId(); + + latestHeartbeatMessages.remove(nodeId); + + try { + getClient().delete().forPath(nodeHeartbeatPath); + logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId); + } catch (final NoNodeException e) { + // node did not exist. Just return. + logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeHeartbeatPath); + return; + } catch (final Exception e) { + logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e); + logger.warn("", e); + + clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/EndpointResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/EndpointResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/EndpointResponseMerger.java new file mode 100644 index 0000000..7b5affe --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/EndpointResponseMerger.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http; + +import java.net.URI; +import java.util.Set; + +import org.apache.nifi.cluster.manager.NodeResponse; + +/** + * <p> + * Maps a set of NodeResponses to a single NodeResponse for a specific REST Endpoint. + * </p> + * + * <p> + * Implementations of this interface MUST be Thread-Safe. + * </p> + */ +public interface EndpointResponseMerger { + + /** + * Indicates whether or not this EndpointResponseMapper can handle mapping responses + * for the given URI and HTTP Method + * + * @param uri the URI of the endpoint + * @param method the HTTP Method used to interact with the endpoint + * + * @return <code>true</code> if the EndpointResponseMapper can handle mapping responses + * for the endpoint described by the given URI and HTTP Method + */ + boolean canHandle(URI uri, String method); + + /** + * Maps the given Node Responses to a single NodeResponse that is appropriate to return + * to the client/user + * + * @param uri the URI of the REST Endpoint + * @param method the HTTP Method used to interact with the REST Endpoint + * @param successfulResponses the responses from nodes that were successful in handling the request + * @param problematicResponses the responses from nodes that were not successful in handling the request + * @param clientResponse the response that was chosen to be returned to the client + * + * @return a NodeResponse that is appropriate to return to the client/user + */ + NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java new file mode 100644 index 0000000..6102b74 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http; + +import java.net.URI; +import java.util.Set; + +import org.apache.nifi.cluster.manager.NodeResponse; + +/** + * <p> + * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster + * and distilling them down to a single response that would be appropriate to respond with, to the + * user/client who made the original web requests. + * </p> + */ +public interface HttpResponseMerger { + + /** + * Maps the responses from all nodes in the cluster to a single NodeResponse object that + * is appropriate to respond with + * + * @param uri the URI of the web request that was made + * @param httpMethod the HTTP Method that was used when making the request + * @param nodeResponses the responses received from the individual nodes + * + * @return a single NodeResponse that represents the response that should be returned to the user/client + */ + NodeResponse mergeResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses); + + /** + * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses + * that indicate that the node was unable to fulfill the request + * + * @param allResponses the responses to filter + * + * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses + * that indicate that the node was unable to fulfill the request + */ + Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses); + + /** + * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them + * + * @param uri the URI of the request + * @param httpMethod the HTTP Method of the request + * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise + */ + boolean isResponseInterpreted(URI uri, String httpMethod); +}
