NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so that there is SwappablePriorityQueue Several unit tests written Added REST API Endpoint to allow PUT to update connection to use load balancing or not. When enabling load balancing, though, I saw the queue size go from 9 to 18. Then was only able to process 9 FlowFiles. Bug fixes Code refactoring Added integration tests, bug fixes Refactored clients to use NIO Bug fixes. Appears to finally be working with NIO Client!!!!! NIFI-5516: Refactored some code from NioAsyncLoadBalanceClient to LoadBalanceSession Bug fixes and allowed load balancing socket connections to be reused Implemented ability to compress Nothing, Attributes, or Content + Attributes when performing load-balancing Added flag to ConnectionDTO to indicate Load Balance Status Updated Diagnostics DTO for connections Store state about cluster topology in NodeClusterCoordinator so that the state is known upon restart Code cleanup Fixed checkstyle and unit tests NIFI-5516: Updating logic for Cluster Node Firewall so that the node's identity comes from its certificate, not from whatever it says it is. NIFI-5516: FIxed missing License headers NIFI-5516: Some minor code cleanup NIFI-5516: Adddressed review feedback; Bug fixes; some code cleanup. Changed dependency on nifi-registry from SNAPSHOT to official 0.3.0 release NIFI-5516: Take backpressure configuration into account NIFI-5516: Fixed ConnectionDiagnosticsSnapshot to include node identifier NIFI-5516: Addressed review feedback
This closes #2947 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/619f1ffe Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/619f1ffe Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/619f1ffe Branch: refs/heads/master Commit: 619f1ffe8fbbca61bc5545f13920190a77006e08 Parents: 5872eb3 Author: Mark Payne <[email protected]> Authored: Thu Jun 14 11:57:21 2018 -0400 Committer: Jeff Storck <[email protected]> Committed: Thu Oct 4 16:11:05 2018 -0400 ---------------------------------------------------------------------- .../apache/nifi/controller/queue/QueueSize.java | 4 + .../java/org/apache/nifi/flowfile/FlowFile.java | 4 + .../org/apache/nifi/util/NiFiProperties.java | 31 + .../org/apache/nifi/stream/io/StreamUtils.java | 34 +- .../src/main/asciidoc/administration-guide.adoc | 7 + .../nifi/controller/queue/FlowFileQueue.java | 81 +- .../queue/LoadBalanceCompression.java | 35 + .../controller/queue/LoadBalanceStrategy.java | 41 + .../queue/LoadBalancedFlowFileQueue.java | 69 + .../queue/LocalQueuePartitionDiagnostics.java | 32 + .../nifi/controller/queue/QueueDiagnostics.java | 28 + .../queue/RemoteQueuePartitionDiagnostics.java | 30 + .../repository/FlowFileSwapManager.java | 27 +- .../apache/nifi/web/api/dto/ConnectionDTO.java | 49 + .../diagnostics/ConnectionDiagnosticsDTO.java | 127 +- .../ConnectionDiagnosticsSnapshotDTO.java | 76 + .../dto/diagnostics/LocalQueuePartitionDTO.java | 136 ++ .../diagnostics/ProcessorDiagnosticsDTO.java | 10 +- .../diagnostics/RemoteQueuePartitionDTO.java | 126 ++ .../coordination/ClusterCoordinator.java | 26 +- .../ClusterTopologyEventListener.java | 29 + .../nifi/cluster/protocol/NodeIdentifier.java | 59 +- .../nifi/cluster/protocol/ProtocolHandler.java | 5 +- .../protocol/impl/SocketProtocolListener.java | 64 +- .../jaxb/message/AdaptedNodeIdentifier.java | 18 + .../jaxb/message/NodeIdentifierAdapter.java | 4 +- .../protocol/message/ProtocolMessage.java | 19 - .../resources/nifi-cluster-protocol-context.xml | 14 +- .../impl/testutils/DelayedProtocolHandler.java | 8 +- .../testutils/ReflexiveProtocolHandler.java | 8 +- .../heartbeat/AbstractHeartbeatMonitor.java | 3 +- .../ClusterProtocolHeartbeatMonitor.java | 23 +- .../node/NodeClusterCoordinator.java | 292 +++- .../node/state/NodeIdentifierDescriptor.java | 167 ++ .../cluster/manager/ConnectionEntityMerger.java | 18 + .../ProcessorDiagnosticsEntityMerger.java | 149 +- .../flow/TestPopularVoteFlowElection.java | 2 +- .../heartbeat/TestAbstractHeartbeatMonitor.java | 15 +- .../http/StandardHttpResponseMapperSpec.groovy | 6 +- .../CurrentUserEndpointMergerTest.java | 4 +- .../StatusHistoryEndpointMergerSpec.groovy | 3 +- .../node/TestNodeClusterCoordinator.java | 82 +- .../integration/ClusterConnectionIT.java | 2 +- .../apache/nifi/cluster/integration/Node.java | 34 +- .../manager/ConnectionEntityMergerSpec.groovy | 2 +- .../ControllerServiceEntityMergerSpec.groovy | 2 +- .../manager/LabelEntityMergerSpec.groovy | 2 +- .../nifi/controller/DropFlowFileRequest.java | 111 -- .../controller/queue/DropFlowFileRequest.java | 107 ++ .../repository/ContentNotFoundException.java | 16 + .../nifi/connectable/StandardConnection.java | 103 +- .../nifi/controller/FileSystemSwapManager.java | 152 +- .../apache/nifi/controller/FlowController.java | 142 +- .../nifi/controller/StandardFlowFileQueue.java | 1572 ------------------ .../nifi/controller/StandardFlowService.java | 4 +- .../controller/StandardFlowSynchronizer.java | 119 +- .../nifi/controller/StandardProcessorNode.java | 11 +- .../controller/queue/AbstractFlowFileQueue.java | 460 +++++ .../queue/BlockingSwappablePriorityQueue.java | 84 + .../queue/ConnectionEventListener.java | 24 + .../controller/queue/DropFlowFileAction.java | 27 + .../queue/DropFlowFileRepositoryRecord.java | 91 + .../controller/queue/FlowFileQueueContents.java | 46 + .../controller/queue/FlowFileQueueFactory.java | 22 + .../controller/queue/FlowFileQueueSize.java | 94 ++ .../nifi/controller/queue/MaxQueueSize.java | 47 + .../queue/NopConnectionEventListener.java | 29 + .../nifi/controller/queue/QueuePrioritizer.java | 90 + .../controller/queue/StandardFlowFileQueue.java | 213 +++ .../StandardLocalQueuePartitionDiagnostics.java | 60 + .../queue/StandardQueueDiagnostics.java | 40 + ...StandardRemoteQueuePartitionDiagnostics.java | 53 + .../queue/SwappablePriorityQueue.java | 990 +++++++++++ .../nifi/controller/queue/TimePeriod.java | 41 + .../ContentRepositoryFlowFileAccess.java | 91 + .../queue/clustered/FlowFileContentAccess.java | 29 + .../queue/clustered/SimpleLimitThreshold.java | 42 + .../SocketLoadBalancedFlowFileQueue.java | 1024 ++++++++++++ .../queue/clustered/TransactionThreshold.java | 26 + .../clustered/TransferFailureDestination.java | 51 + .../client/LoadBalanceFlowFileCodec.java | 27 + .../StandardLoadBalanceFlowFileCodec.java | 50 + .../client/async/AsyncLoadBalanceClient.java | 49 + .../async/AsyncLoadBalanceClientFactory.java | 24 + .../async/AsyncLoadBalanceClientRegistry.java | 32 + .../async/TransactionCompleteCallback.java | 26 + .../async/TransactionFailureCallback.java | 44 + .../client/async/nio/LoadBalanceSession.java | 641 +++++++ .../async/nio/NioAsyncLoadBalanceClient.java | 473 ++++++ .../nio/NioAsyncLoadBalanceClientFactory.java | 50 + .../nio/NioAsyncLoadBalanceClientRegistry.java | 122 ++ .../nio/NioAsyncLoadBalanceClientTask.java | 107 ++ .../clustered/client/async/nio/PeerChannel.java | 358 ++++ .../client/async/nio/RegisteredPartition.java | 75 + .../CorrelationAttributePartitioner.java | 61 + .../partition/FirstNodePartitioner.java | 43 + .../partition/FlowFilePartitioner.java | 53 + .../partition/LocalPartitionPartitioner.java | 42 + .../partition/LocalQueuePartition.java | 108 ++ .../clustered/partition/QueuePartition.java | 102 ++ .../partition/RebalancingPartition.java | 45 + .../partition/RemoteQueuePartition.java | 352 ++++ .../partition/RoundRobinPartitioner.java | 44 + .../partition/StandardRebalancingPartition.java | 222 +++ .../SwappablePriorityQueueLocalPartition.java | 175 ++ .../protocol/LoadBalanceProtocolConstants.java | 46 + .../server/ClusterLoadBalanceAuthorizer.java | 67 + .../server/ConnectionLoadBalanceServer.java | 251 +++ .../clustered/server/LoadBalanceAuthorizer.java | 24 + .../clustered/server/LoadBalanceProtocol.java | 35 + .../server/NotAuthorizedException.java | 26 + .../server/StandardLoadBalanceProtocol.java | 614 +++++++ .../server/TransactionAbortedException.java | 30 + .../repository/RepositoryContext.java | 3 - .../repository/StandardProcessSession.java | 10 +- .../serialization/FlowFromDOMFactory.java | 23 +- .../serialization/StandardFlowSerializer.java | 41 +- .../manager/StandardStateManagerProvider.java | 32 +- .../nifi/fingerprint/FingerprintFactory.java | 41 +- .../nifi/groups/StandardProcessGroup.java | 40 +- .../flow/mapping/NiFiRegistryFlowMapper.java | 30 +- .../src/main/resources/FlowConfiguration.xsd | 88 +- .../nifi/controller/MockFlowFileRecord.java | 139 ++ .../apache/nifi/controller/MockSwapManager.java | 178 ++ .../controller/TestStandardFlowFileQueue.java | 354 +--- .../queue/clustered/LoadBalancedQueueIT.java | 1345 +++++++++++++++ .../MockTransferFailureDestination.java | 62 + .../TestContentRepositoryFlowFileAccess.java | 130 ++ .../clustered/TestNaiveLimitThreshold.java | 60 + .../TestSocketLoadBalancedFlowFileQueue.java | 514 ++++++ .../clustered/TestSwappablePriorityQueue.java | 471 ++++++ .../async/nio/TestLoadBalanceSession.java | 273 +++ .../server/TestStandardLoadBalanceProtocol.java | 656 ++++++++ .../repository/TestStandardProcessSession.java | 105 +- .../TestWriteAheadFlowFileRepository.java | 154 +- .../src/test/resources/localhost-ks.jks | Bin 0 -> 3076 bytes .../src/test/resources/localhost-ts.jks | Bin 0 -> 911 bytes .../src/test/resources/logback-test.xml | 6 +- .../repository/claim/StandardResourceClaim.java | 4 +- .../nifi-framework/nifi-resources/pom.xml | 7 + .../src/main/resources/conf/nifi.properties | 7 + .../org/apache/nifi/web/api/dto/DtoFactory.java | 100 +- .../web/dao/impl/StandardConnectionDAO.java | 14 + nifi-nar-bundles/nifi-framework-bundle/pom.xml | 12 +- 144 files changed, 14589 insertions(+), 2746 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java index 35a860b..50157e2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java @@ -63,6 +63,10 @@ public class QueueSize { return new QueueSize(objectCount + other.getObjectCount(), totalSizeBytes + other.getByteCount()); } + public QueueSize add(final int count, final long bytes) { + return new QueueSize(objectCount + count, totalSizeBytes + bytes); + } + @Override public String toString() { return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]"; http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index 7d0e27e..c9eb49a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -121,5 +121,9 @@ public interface FlowFile extends Comparable<FlowFile> { } return key; } + + public static boolean isValid(final String key) { + return key != null && !key.trim().isEmpty(); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 4d4f483..9ded1a1 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -202,6 +202,13 @@ public abstract class NiFiProperties { public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; + // cluster load balance properties + public static final String LOAD_BALANCE_ADDRESS = "nifi.cluster.load.balance.address"; + public static final String LOAD_BALANCE_PORT = "nifi.cluster.load.balance.port"; + public static final String LOAD_BALANCE_CONNECTIONS_PER_NODE = "nifi.cluster.load.balance.connections.per.node"; + public static final String LOAD_BALANCE_MAX_THREAD_COUNT = "nifi.cluster.load.balance.max.thread.count"; + public static final String LOAD_BALANCE_COMMS_TIMEOUT = "nifi.cluster.load.balance.comms.timeout"; + // zookeeper properties public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string"; public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout"; @@ -285,6 +292,13 @@ public abstract class NiFiProperties { public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs"; public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins"; + // cluster load balance defaults + public static final int DEFAULT_LOAD_BALANCE_PORT = 6342; + public static final int DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE = 4; + public static final int DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT = 8; + public static final String DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT = "30 sec"; + + // state management defaults public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml"; @@ -734,6 +748,23 @@ public abstract class NiFiProperties { } } + public InetSocketAddress getClusterLoadBalanceAddress() { + try { + String address = getProperty(LOAD_BALANCE_ADDRESS); + if (StringUtils.isBlank(address)) { + address = getProperty(CLUSTER_NODE_ADDRESS); + } + if (StringUtils.isBlank(address)) { + address = "localhost"; + } + + final int port = getIntegerProperty(LOAD_BALANCE_PORT, DEFAULT_LOAD_BALANCE_PORT); + return InetSocketAddress.createUnresolved(address, port); + } catch (final Exception e) { + throw new RuntimeException("Invalid load balance address/port due to: " + e, e); + } + } + public Integer getClusterNodeProtocolPort() { try { return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java index 64f6eaa..dca3d0c 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.stream.io; +import org.apache.nifi.stream.io.exception.BytePatternNotFoundException; +import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -23,9 +26,6 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import org.apache.nifi.stream.io.exception.BytePatternNotFoundException; -import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; - public class StreamUtils { public static long copy(final InputStream source, final OutputStream destination) throws IOException { @@ -102,6 +102,34 @@ public class StreamUtils { } /** + * Reads <code>byteCount</code> bytes of data from the given InputStream, writing to the provided byte array. + * + * @param source the InputStream to read from + * @param destination the destination for the data + * @param byteCount the number of bytes to copy + * + * @throws IllegalArgumentException if the given byte array is smaller than <code>byteCount</code> elements. + * @throws EOFException if the InputStream does not have <code>byteCount</code> bytes in the InputStream + * @throws IOException if unable to read from the InputStream + */ + public static void read(final InputStream source, final byte[] destination, final int byteCount) throws IOException { + if (destination.length < byteCount) { + throw new IllegalArgumentException(); + } + + int bytesRead = 0; + int len; + while (bytesRead < byteCount) { + len = source.read(destination, bytesRead, byteCount - bytesRead); + if (len < 0) { + throw new EOFException("Expected to consume " + byteCount + " bytes but consumed only " + bytesRead); + } + + bytesRead += len; + } + } + + /** * Copies data from in to out until either we are out of data (returns null) or we hit one of the byte patterns identified by the <code>stoppers</code> parameter (returns the byte pattern * matched). The bytes in the stopper will be copied. * http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index d4c4fc9..05aeff7 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -3934,6 +3934,13 @@ from the remote node before considering the communication with the node a failur to the cluster. It provides an additional layer of security. This value is blank by default, meaning that no firewall file is to be used. |`nifi.cluster.flow.election.max.wait.time`|Specifies the amount of time to wait before electing a Flow as the "correct" Flow. If the number of Nodes that have voted is equal to the number specified by the `nifi.cluster.flow.election.max.candidates` property, the cluster will not wait this long. The default value is `5 mins`. Note that the time starts as soon as the first vote is cast. |`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes required in the cluster to cause early election of Flows. This allows the Nodes in the cluster to avoid having to wait a long time before starting processing if we reach at least this number of nodes in the cluster. +|`nifi.cluster.flow.election.max.wait.time`|Specifies the amount of time to wait before electing a Flow as the "correct" Flow. If the number of Nodes that have voted is equal to the number specified + by the `nifi.cluster.flow.election.max.candidates` property, the cluster will not wait this long. The default value is `5 mins`. Note that the time starts as soon as the first vote is cast. +|`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes required in the cluster to cause early election of Flows. This allows the Nodes in the cluster to avoid having to wait a +long time before starting processing if we reach at least this number of nodes in the cluster. +|`nifi.cluster.load.balance.port`|Specifies the port to listen on for incoming connections for load balancing data across the cluster. The default value is `6342`. +|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for incoming connections for load balancing data across the cluster. If not specified, will default to the value used by the `nifi +.cluster.node.address` property. |==== [[claim_management]] http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 9e637b0..2c7f55b 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -16,18 +16,17 @@ */ package org.apache.nifi.controller.queue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; + import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.SwapSummary; -import org.apache.nifi.flowfile.FlowFilePrioritizer; -import org.apache.nifi.processor.FlowFileFilter; - public interface FlowFileQueue { /** @@ -59,8 +58,6 @@ public interface FlowFileQueue { */ void purgeSwapFiles(); - int getSwapFileCount(); - /** * Resets the comparator used by this queue to maintain order. * @@ -108,33 +105,21 @@ public interface FlowFileQueue { */ boolean isActiveQueueEmpty(); - /** - * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile - * is considered to be unacknowledged if it has been pulled from the queue by some component - * but the session that pulled the FlowFile has not yet been committed or rolled back. - * - * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. - */ - QueueSize getUnacknowledgedQueueSize(); - - QueueSize getActiveQueueSize(); - - QueueSize getSwapQueueSize(); - void acknowledge(FlowFileRecord flowFile); void acknowledge(Collection<FlowFileRecord> flowFiles); /** + * @return <code>true</code> if at least one FlowFile is unacknowledged, <code>false</code> if all FlowFiles that have been dequeued have been acknowledged + */ + boolean isUnacknowledgedFlowFile(); + + /** * @return true if maximum queue size has been reached or exceeded; false * otherwise */ boolean isFull(); - boolean isAnyActiveFlowFilePenalized(); - - boolean isAllActiveFlowFilesPenalized(); - /** * places the given file into the queue * @@ -163,18 +148,6 @@ public interface FlowFileQueue { */ List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords); - /** - * Drains flow files from the given source queue into the given destination - * list. - * - * @param sourceQueue queue to drain from - * @param destination Collection to drain to - * @param maxResults max number to drain - * @param expiredRecords for expired records - * @return size (bytes) of flow files drained from queue - */ - long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords); - List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); String getFlowFileExpiration(); @@ -187,7 +160,7 @@ public interface FlowFileQueue { * Initiates a request to drop all FlowFiles in this queue. This method returns * a DropFlowFileStatus that can be used to determine the current state of the request. * Additionally, the DropFlowFileStatus provides a request identifier that can then be - * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)} + * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileRequest(String)} * methods in order to obtain the status later or cancel a request * * @param requestIdentifier the identifier of the Drop FlowFile Request @@ -200,7 +173,7 @@ public interface FlowFileQueue { /** * Returns the current status of a Drop FlowFile Request that was initiated via the - * {@link #dropFlowFiles()} method that has the given identifier + * {@link #dropFlowFiles(String, String)} method that has the given identifier * * @param requestIdentifier the identifier of the Drop FlowFile Request * @return the status for the request with the given identifier, or <code>null</code> if no @@ -244,7 +217,7 @@ public interface FlowFileQueue { ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults); /** - * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)} + * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String, int)} * method that has the given identifier * * @param requestIdentifier the identifier of the Drop FlowFile Request @@ -282,4 +255,32 @@ public interface FlowFileQueue { * @throws IllegalStateException if the queue is not in a state in which a listing can be performed */ void verifyCanList() throws IllegalStateException; + + /** + * Returns diagnostic information about the queue + */ + QueueDiagnostics getQueueDiagnostics(); + + void lock(); + + void unlock(); + + void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute); + + LoadBalanceStrategy getLoadBalanceStrategy(); + + void setLoadBalanceCompression(LoadBalanceCompression compression); + + LoadBalanceCompression getLoadBalanceCompression(); + + String getPartitioningAttribute(); + + void startLoadBalancing(); + + void stopLoadBalancing(); + + /** + * @return <code>true</code> if the queue is actively transferring data to another node, <code>false</code> otherwise + */ + boolean isActivelyLoadBalancing(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceCompression.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceCompression.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceCompression.java new file mode 100644 index 0000000..95c0b6f --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceCompression.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public enum LoadBalanceCompression { + /** + * FlowFiles will not be compressed + */ + DO_NOT_COMPRESS, + + /** + * FlowFiles' attributes will be compressed, but the FlowFiles' contents will not be + */ + COMPRESS_ATTRIBUTES_ONLY, + + /** + * FlowFiles' attributes and content will be compressed + */ + COMPRESS_ATTRIBUTES_AND_CONTENT; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceStrategy.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceStrategy.java new file mode 100644 index 0000000..3053548 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalanceStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public enum LoadBalanceStrategy { + /** + * Do not load balance FlowFiles between nodes in the cluster. + */ + DO_NOT_LOAD_BALANCE, + + /** + * Determine which node to send a given FlowFile to based on the value of a user-specified FlowFile Attribute. + * All FlowFiles that have the same value for said Attribute will be sent to the same node in the cluster. + */ + PARTITION_BY_ATTRIBUTE, + + /** + * FlowFiles will be distributed to nodes in the cluster in a Round-Robin fashion. + */ + ROUND_ROBIN, + + /** + * All FlowFiles will be sent to the same node. Which node they are sent to is not defined. + */ + SINGLE_NODE; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java new file mode 100644 index 0000000..f0eff27 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.Collection; + +public interface LoadBalancedFlowFileQueue extends FlowFileQueue { + /** + * Adds the given FlowFiles to this queue, as they have been received from another node in the cluster + * @param flowFiles the FlowFiles received from the peer + */ + void receiveFromPeer(Collection<FlowFileRecord> flowFiles); + + /** + * Distributes the given FlowFiles to the appropriate partitions. Unlike the {@link #putAll(Collection)} method, + * this does not alter the size of the FlowFile Queue itself, as it is intended only to place the FlowFiles into + * their appropriate partitions + * + * @param flowFiles the FlowFiles to distribute + */ + void distributeToPartitions(Collection<FlowFileRecord> flowFiles); + + /** + * Notifies the queue that the given FlowFiles have been successfully transferred to another node + * @param flowFiles the FlowFiles that were transferred + */ + void onTransfer(Collection<FlowFileRecord> flowFiles); + + /** + * Notifies the queue the a transaction containing the given FlowFiles was aborted + * @param flowFiles the FlowFiles in the transaction + */ + void onAbort(Collection<FlowFileRecord> flowFiles); + + /** + * Handles updating the repositories for the given FlowFiles, which have been expired + * @param flowFiles the expired FlowFiles + */ + void handleExpiredRecords(Collection<FlowFileRecord> flowFiles); + + /** + * There are times when we want to ensure that if a node in the cluster reaches the point where backpressure is engaged, that we + * honor that backpressure and do not attempt to load balance from a different node in the cluster to that node. There are other times + * when we may want to push data to the remote node even though it has already reached its backpressure threshold. This method indicates + * whether or not we want to propagate that backpressure indicator across the cluster. + * + * @return <code>true</code> if backpressure on Node A should prevent Node B from sending to it, <code>false</code> if Node B should send to Node A + * even when backpressure is engaged on Node A. + */ + boolean isPropagateBackpressureAcrossNodes(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LocalQueuePartitionDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LocalQueuePartitionDiagnostics.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LocalQueuePartitionDiagnostics.java new file mode 100644 index 0000000..cc097b1 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LocalQueuePartitionDiagnostics.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public interface LocalQueuePartitionDiagnostics { + QueueSize getUnacknowledgedQueueSize(); + + QueueSize getActiveQueueSize(); + + QueueSize getSwapQueueSize(); + + int getSwapFileCount(); + + boolean isAnyActiveFlowFilePenalized(); + + boolean isAllActiveFlowFilesPenalized(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/QueueDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/QueueDiagnostics.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/QueueDiagnostics.java new file mode 100644 index 0000000..4b5d93f --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/QueueDiagnostics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.List; + +public interface QueueDiagnostics { + + LocalQueuePartitionDiagnostics getLocalQueuePartitionDiagnostics(); + + List<RemoteQueuePartitionDiagnostics> getRemoteQueuePartitionDiagnostics(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/RemoteQueuePartitionDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/RemoteQueuePartitionDiagnostics.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/RemoteQueuePartitionDiagnostics.java new file mode 100644 index 0000000..caa6b2d --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/RemoteQueuePartitionDiagnostics.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public interface RemoteQueuePartitionDiagnostics { + String getNodeIdentifier(); + + QueueSize getUnacknowledgedQueueSize(); + + QueueSize getActiveQueueSize(); + + QueueSize getSwapQueueSize(); + + int getSwapFileCount(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 7092a6f..8d9b38f 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository; import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -44,11 +45,12 @@ public interface FlowFileSwapManager { * * @param flowFiles the FlowFiles to swap out to external storage * @param flowFileQueue the queue that the FlowFiles belong to + * @param partitionName the name of the partition within the queue, or <code>null</code> if the queue is not partitioned * @return the location of the externally stored swap file * * @throws IOException if unable to swap the FlowFiles out */ - String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException; + String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException; /** * Recovers the FlowFiles from the swap file that lives at the given location. This action @@ -82,11 +84,32 @@ public interface FlowFileSwapManager { * Determines swap files that exist for the given FlowFileQueue * * @param flowFileQueue the queue for which the FlowFiles should be recovered + * @param partitionName the partition within the FlowFileQueue to recover, or <code>null</code> if the queue is not partitioned * * @return all swap locations that have been identified for the given queue, in the order that they should * be swapped back in */ - List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException; + List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, String partitionName) throws IOException; + + /** + * Determines the names of each of the Partitions for which there are swap files for the given queue + * + * @param queue the queue to which the FlowFiles belong + * + * @return the Set of names of all Partitions for which there are swap files + * @throws IOException if unable to read the information from the underlying storage + */ + Set<String> getSwappedPartitionNames(FlowFileQueue queue) throws IOException; + + /** + * Updates the name of the partition that owns a given swap file + * + * @param swapLocation the location of the swap file + * @param newPartitionName the new name of the new partition that owns the swap file + * @return the new swap location + * @throws IOException if unable to rename the swap file + */ + String changePartitionName(String swapLocation, String newPartitionName) throws IOException; /** * Parses the contents of the swap file at the given location and provides a SwapSummary that provides http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectionDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectionDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectionDTO.java index a2272e0..f62feac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectionDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectionDTO.java @@ -27,6 +27,9 @@ import java.util.Set; */ @XmlType(name = "connection") public class ConnectionDTO extends ComponentDTO { + public static final String LOAD_BALANCE_NOT_CONFIGURED = "LOAD_BALANCE_NOT_CONFIGURED"; + public static final String LOAD_BALANCE_INACTIVE = "LOAD_BALANCE_INACTIVE"; + public static final String LOAD_BALANCE_ACTIVE = "LOAD_BALANCE_ACTIVE"; private ConnectableDTO source; private ConnectableDTO destination; @@ -42,6 +45,11 @@ public class ConnectionDTO extends ComponentDTO { private List<String> prioritizers; private List<PositionDTO> bends; + private String loadBalanceStrategy; + private String loadBalancePartitionAttribute; + private String loadBalanceCompression; + private String loadBalanceStatus; + /** * The source of this connection. * @@ -231,6 +239,47 @@ public class ConnectionDTO extends ComponentDTO { this.prioritizers = prioritizers; } + @ApiModelProperty(value = "How to load balance the data in this Connection across the nodes in the cluster.", + allowableValues = "DO_NOT_LOAD_BALANCE, PARTITION_BY_ATTRIBUTE, ROUND_ROBIN, SINGLE_NODE") + public String getLoadBalanceStrategy() { + return loadBalanceStrategy; + } + + public void setLoadBalanceStrategy(String loadBalanceStrategy) { + this.loadBalanceStrategy = loadBalanceStrategy; + } + + @ApiModelProperty(value = "The FlowFile Attribute to use for determining which node a FlowFile will go to if the Load Balancing Strategy is set to PARTITION_BY_ATTRIBUTE") + public String getLoadBalancePartitionAttribute() { + return loadBalancePartitionAttribute; + } + + public void setLoadBalancePartitionAttribute(String partitionAttribute) { + this.loadBalancePartitionAttribute = partitionAttribute; + } + + @ApiModelProperty(value = "Whether or not data should be compressed when being transferred between nodes in the cluster.", + allowableValues = "DO_NOT_COMPRESS, COMPRESS_ATTRIBUTES_ONLY, COMPRESS_ATTRIBUTES_AND_CONTENT") + public String getLoadBalanceCompression() { + return loadBalanceCompression; + } + + public void setLoadBalanceCompression(String compression) { + this.loadBalanceCompression = compression; + } + + @ApiModelProperty(value = "The current status of the Connection's Load Balancing Activities. Status can indicate that Load Balancing is not configured for the connection, that Load Balancing " + + "is configured but inactive (not currently transferring data to another node), or that Load Balancing is configured and actively transferring data to another node.", + allowableValues = LOAD_BALANCE_NOT_CONFIGURED + ", " + LOAD_BALANCE_INACTIVE + ", " + LOAD_BALANCE_ACTIVE, + readOnly = true) + public String getLoadBalanceStatus() { + return loadBalanceStatus; + } + + public void setLoadBalanceStatus(String status) { + this.loadBalanceStatus = status; + } + @Override public String toString() { return "ConnectionDTO [id: " + getId() + "]"; http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsDTO.java index 951ac41..05e9aff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsDTO.java @@ -14,135 +14,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.web.api.dto.diagnostics; -import javax.xml.bind.annotation.XmlType; - +import io.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.ConnectionDTO; -import io.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlType; +import java.util.List; -@XmlType(name = "connectionDiagnostics") +@XmlType(name="connectionDiagnostics") public class ConnectionDiagnosticsDTO { private ConnectionDTO connection; - private int totalFlowFileCount; - private long totalByteCount; - private int activeQueueFlowFileCount; - private long activeQueueByteCount; - private int swapFlowFileCount; - private long swapByteCount; - private int swapFiles; - private int inFlightFlowFileCount; - private long inFlightByteCount; - private Boolean allActiveQueueFlowFilesPenalized; - private Boolean anyActiveQueueFlowFilesPenalized; + private ConnectionDiagnosticsSnapshotDTO aggregateSnapshot; + private List<ConnectionDiagnosticsSnapshotDTO> nodeSnapshots; - @ApiModelProperty("Information about the Connection") + @ApiModelProperty(value = "Details about the connection", readOnly = true) public ConnectionDTO getConnection() { return connection; } - public void setConnection(ConnectionDTO connection) { + public void setConnection(final ConnectionDTO connection) { this.connection = connection; } - @ApiModelProperty("Total number of FlowFiles owned by the Connection") - public int getTotalFlowFileCount() { - return totalFlowFileCount; - } - - public void setTotalFlowFileCount(int totalFlowFileCount) { - this.totalFlowFileCount = totalFlowFileCount; - } - - @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles owned by this Connection") - public long getTotalByteCount() { - return totalByteCount; - } - - public void setTotalByteCount(long totalByteCount) { - this.totalByteCount = totalByteCount; - } - - @ApiModelProperty("Total number of FlowFiles that exist in the Connection's Active Queue, immediately available to be offered up to a component") - public int getActiveQueueFlowFileCount() { - return activeQueueFlowFileCount; - } - - public void setActiveQueueFlowFileCount(int activeQueueFlowFileCount) { - this.activeQueueFlowFileCount = activeQueueFlowFileCount; - } - - @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are present in the Connection's Active Queue") - public long getActiveQueueByteCount() { - return activeQueueByteCount; - } - - public void setActiveQueueByteCount(long activeQueueByteCount) { - this.activeQueueByteCount = activeQueueByteCount; - } - - @ApiModelProperty("The total number of FlowFiles that are swapped out for this Connection") - public int getSwapFlowFileCount() { - return swapFlowFileCount; - } - - public void setSwapFlowFileCount(int swapFlowFileCount) { - this.swapFlowFileCount = swapFlowFileCount; - } - - @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are swapped out to disk for the Connection") - public long getSwapByteCount() { - return swapByteCount; - } - - public void setSwapByteCount(long swapByteCount) { - this.swapByteCount = swapByteCount; - } - - @ApiModelProperty("The number of Swap Files that exist for this Connection") - public int getSwapFiles() { - return swapFiles; - } - - public void setSwapFiles(int swapFiles) { - this.swapFiles = swapFiles; - } - - @ApiModelProperty("The number of In-Flight FlowFiles for this Connection. These are FlowFiles that belong to the connection but are currently being operated on by a Processor, Port, etc.") - public int getInFlightFlowFileCount() { - return inFlightFlowFileCount; - } - - public void setInFlightFlowFileCount(int inFlightFlowFileCount) { - this.inFlightFlowFileCount = inFlightFlowFileCount; - } - - @ApiModelProperty("The number bytes that make up the content of the FlowFiles that are In-Flight") - public long getInFlightByteCount() { - return inFlightByteCount; - } - - public void setInFlightByteCount(long inFlightByteCount) { - this.inFlightByteCount = inFlightByteCount; - } - - @ApiModelProperty("Whether or not all of the FlowFiles in the Active Queue are penalized") - public Boolean getAllActiveQueueFlowFilesPenalized() { - return allActiveQueueFlowFilesPenalized; + @ApiModelProperty(value = "Aggregate values for all nodes in the cluster, or for this instance if not clustered", readOnly = true) + public ConnectionDiagnosticsSnapshotDTO getAggregateSnapshot() { + return aggregateSnapshot; } - public void setAllActiveQueueFlowFilesPenalized(Boolean allFlowFilesPenalized) { - this.allActiveQueueFlowFilesPenalized = allFlowFilesPenalized; + public void setAggregateSnapshot(final ConnectionDiagnosticsSnapshotDTO aggregateSnapshot) { + this.aggregateSnapshot = aggregateSnapshot; } - @ApiModelProperty("Whether or not any of the FlowFiles in the Active Queue are penalized") - public Boolean getAnyActiveQueueFlowFilesPenalized() { - return anyActiveQueueFlowFilesPenalized; + @ApiModelProperty(value = "A list of values for each node in the cluster, if clustered.", readOnly = true) + public List<ConnectionDiagnosticsSnapshotDTO> getNodeSnapshots() { + return nodeSnapshots; } - public void setAnyActiveQueueFlowFilesPenalized(Boolean anyFlowFilesPenalized) { - this.anyActiveQueueFlowFilesPenalized = anyFlowFilesPenalized; + public void setNodeSnapshots(final List<ConnectionDiagnosticsSnapshotDTO> nodeSnapshots) { + this.nodeSnapshots = nodeSnapshots; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsSnapshotDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsSnapshotDTO.java new file mode 100644 index 0000000..5926f8d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ConnectionDiagnosticsSnapshotDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.diagnostics; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; +import java.util.List; + +@XmlType(name = "connectionDiagnosticsSnapshot") +public class ConnectionDiagnosticsSnapshotDTO { + private int totalFlowFileCount; + private long totalByteCount; + private String nodeIdentifier; + private LocalQueuePartitionDTO localQueuePartition; + private List<RemoteQueuePartitionDTO> remoteQueuePartitions; + + @ApiModelProperty("Total number of FlowFiles owned by the Connection") + public int getTotalFlowFileCount() { + return totalFlowFileCount; + } + + public void setTotalFlowFileCount(int totalFlowFileCount) { + this.totalFlowFileCount = totalFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles owned by this Connection") + public long getTotalByteCount() { + return totalByteCount; + } + + public void setTotalByteCount(long totalByteCount) { + this.totalByteCount = totalByteCount; + } + + @ApiModelProperty("The Node Identifier that this information pertains to") + public String getNodeIdentifier() { + return nodeIdentifier; + } + + public void setNodeIdentifier(final String nodeIdentifier) { + this.nodeIdentifier = nodeIdentifier; + } + + @ApiModelProperty("The local queue partition, from which components can pull FlowFiles on this node.") + public LocalQueuePartitionDTO getLocalQueuePartition() { + return localQueuePartition; + } + + public void setLocalQueuePartition(LocalQueuePartitionDTO localQueuePartition) { + this.localQueuePartition = localQueuePartition; + } + + public List<RemoteQueuePartitionDTO> getRemoteQueuePartitions() { + return remoteQueuePartitions; + } + + public void setRemoteQueuePartitions(List<RemoteQueuePartitionDTO> remoteQueuePartitions) { + this.remoteQueuePartitions = remoteQueuePartitions; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/LocalQueuePartitionDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/LocalQueuePartitionDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/LocalQueuePartitionDTO.java new file mode 100644 index 0000000..971c62a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/LocalQueuePartitionDTO.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.diagnostics; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "localQueuePartition") +public class LocalQueuePartitionDTO { + private int totalFlowFileCount; + private long totalByteCount; + private int activeQueueFlowFileCount; + private long activeQueueByteCount; + private int swapFlowFileCount; + private long swapByteCount; + private int swapFiles; + private int inFlightFlowFileCount; + private long inFlightByteCount; + private Boolean allActiveQueueFlowFilesPenalized; + private Boolean anyActiveQueueFlowFilesPenalized; + + @ApiModelProperty("Total number of FlowFiles owned by the Connection") + public int getTotalFlowFileCount() { + return totalFlowFileCount; + } + + public void setTotalFlowFileCount(int totalFlowFileCount) { + this.totalFlowFileCount = totalFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles owned by this Connection") + public long getTotalByteCount() { + return totalByteCount; + } + + public void setTotalByteCount(long totalByteCount) { + this.totalByteCount = totalByteCount; + } + + @ApiModelProperty("Total number of FlowFiles that exist in the Connection's Active Queue, immediately available to be offered up to a component") + public int getActiveQueueFlowFileCount() { + return activeQueueFlowFileCount; + } + + public void setActiveQueueFlowFileCount(int activeQueueFlowFileCount) { + this.activeQueueFlowFileCount = activeQueueFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are present in the Connection's Active Queue") + public long getActiveQueueByteCount() { + return activeQueueByteCount; + } + + public void setActiveQueueByteCount(long activeQueueByteCount) { + this.activeQueueByteCount = activeQueueByteCount; + } + + @ApiModelProperty("The total number of FlowFiles that are swapped out for this Connection") + public int getSwapFlowFileCount() { + return swapFlowFileCount; + } + + public void setSwapFlowFileCount(int swapFlowFileCount) { + this.swapFlowFileCount = swapFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are swapped out to disk for the Connection") + public long getSwapByteCount() { + return swapByteCount; + } + + public void setSwapByteCount(long swapByteCount) { + this.swapByteCount = swapByteCount; + } + + @ApiModelProperty("The number of Swap Files that exist for this Connection") + public int getSwapFiles() { + return swapFiles; + } + + public void setSwapFiles(int swapFiles) { + this.swapFiles = swapFiles; + } + + @ApiModelProperty("The number of In-Flight FlowFiles for this Connection. These are FlowFiles that belong to the connection but are currently being operated on by a Processor, Port, etc.") + public int getInFlightFlowFileCount() { + return inFlightFlowFileCount; + } + + public void setInFlightFlowFileCount(int inFlightFlowFileCount) { + this.inFlightFlowFileCount = inFlightFlowFileCount; + } + + @ApiModelProperty("The number bytes that make up the content of the FlowFiles that are In-Flight") + public long getInFlightByteCount() { + return inFlightByteCount; + } + + public void setInFlightByteCount(long inFlightByteCount) { + this.inFlightByteCount = inFlightByteCount; + } + + @ApiModelProperty("Whether or not all of the FlowFiles in the Active Queue are penalized") + public Boolean getAllActiveQueueFlowFilesPenalized() { + return allActiveQueueFlowFilesPenalized; + } + + public void setAllActiveQueueFlowFilesPenalized(Boolean allFlowFilesPenalized) { + this.allActiveQueueFlowFilesPenalized = allFlowFilesPenalized; + } + + @ApiModelProperty("Whether or not any of the FlowFiles in the Active Queue are penalized") + public Boolean getAnyActiveQueueFlowFilesPenalized() { + return anyActiveQueueFlowFilesPenalized; + } + + public void setAnyActiveQueueFlowFilesPenalized(Boolean anyFlowFilesPenalized) { + this.anyActiveQueueFlowFilesPenalized = anyFlowFilesPenalized; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ProcessorDiagnosticsDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ProcessorDiagnosticsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ProcessorDiagnosticsDTO.java index 77f5499..ccf759e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ProcessorDiagnosticsDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/ProcessorDiagnosticsDTO.java @@ -17,15 +17,13 @@ package org.apache.nifi.web.api.dto.diagnostics; -import java.util.List; -import java.util.Set; - -import javax.xml.bind.annotation.XmlType; - +import io.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; -import io.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlType; +import java.util.List; +import java.util.Set; @XmlType(name = "processorDiagnostics") public class ProcessorDiagnosticsDTO { http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/RemoteQueuePartitionDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/RemoteQueuePartitionDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/RemoteQueuePartitionDTO.java new file mode 100644 index 0000000..9248714 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/diagnostics/RemoteQueuePartitionDTO.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.diagnostics; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "remoteQueuePartition") +public class RemoteQueuePartitionDTO { + private String nodeId; + private int totalFlowFileCount; + private long totalByteCount; + private int activeQueueFlowFileCount; + private long activeQueueByteCount; + private int swapFlowFileCount; + private long swapByteCount; + private int swapFiles; + private int inFlightFlowFileCount; + private long inFlightByteCount; + + @ApiModelProperty("The Node Identifier that this queue partition is sending to") + public String getNodeIdentifier() { + return nodeId; + } + + public void setNodeIdentifier(String nodeId) { + this.nodeId = nodeId; + } + + @ApiModelProperty("Total number of FlowFiles owned by the Connection") + public int getTotalFlowFileCount() { + return totalFlowFileCount; + } + + public void setTotalFlowFileCount(int totalFlowFileCount) { + this.totalFlowFileCount = totalFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles owned by this Connection") + public long getTotalByteCount() { + return totalByteCount; + } + + public void setTotalByteCount(long totalByteCount) { + this.totalByteCount = totalByteCount; + } + + @ApiModelProperty("Total number of FlowFiles that exist in the Connection's Active Queue, immediately available to be offered up to a component") + public int getActiveQueueFlowFileCount() { + return activeQueueFlowFileCount; + } + + public void setActiveQueueFlowFileCount(int activeQueueFlowFileCount) { + this.activeQueueFlowFileCount = activeQueueFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are present in the Connection's Active Queue") + public long getActiveQueueByteCount() { + return activeQueueByteCount; + } + + public void setActiveQueueByteCount(long activeQueueByteCount) { + this.activeQueueByteCount = activeQueueByteCount; + } + + @ApiModelProperty("The total number of FlowFiles that are swapped out for this Connection") + public int getSwapFlowFileCount() { + return swapFlowFileCount; + } + + public void setSwapFlowFileCount(int swapFlowFileCount) { + this.swapFlowFileCount = swapFlowFileCount; + } + + @ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are swapped out to disk for the Connection") + public long getSwapByteCount() { + return swapByteCount; + } + + public void setSwapByteCount(long swapByteCount) { + this.swapByteCount = swapByteCount; + } + + @ApiModelProperty("The number of Swap Files that exist for this Connection") + public int getSwapFiles() { + return swapFiles; + } + + public void setSwapFiles(int swapFiles) { + this.swapFiles = swapFiles; + } + + @ApiModelProperty("The number of In-Flight FlowFiles for this Connection. These are FlowFiles that belong to the connection but are currently being operated on by a Processor, Port, etc.") + public int getInFlightFlowFileCount() { + return inFlightFlowFileCount; + } + + public void setInFlightFlowFileCount(int inFlightFlowFileCount) { + this.inFlightFlowFileCount = inFlightFlowFileCount; + } + + @ApiModelProperty("The number bytes that make up the content of the FlowFiles that are In-Flight") + public long getInFlightByteCount() { + return inFlightByteCount; + } + + public void setInFlightByteCount(long inFlightByteCount) { + this.inFlightByteCount = inFlightByteCount; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 1083fe6..11786c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -17,11 +17,6 @@ package org.apache.nifi.cluster.coordination; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -31,6 +26,11 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * <p> * Responsible for coordinating nodes in the cluster @@ -127,12 +127,12 @@ public interface ClusterCoordinator { * <code>true</code> if the node is blocked, <code>false</code> if the node is * allowed through the firewall or if there is no firewall configured * - * @param hostname the hostname of the node that is attempting to connect to the cluster + * @param nodeIdentities the identities of the node that is attempting to connect to the cluster * * @return <code>true</code> if the node is blocked, <code>false</code> if the node is * allowed through the firewall or if there is no firewall configured */ - boolean isBlockedByFirewall(String hostname); + boolean isBlockedByFirewall(Set<String> nodeIdentities); /** * Reports that some event occurred that is relevant to the cluster @@ -244,4 +244,16 @@ public interface ClusterCoordinator { * @throws IOException thrown when it failed to communicate with the cluster coordinator. */ Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException; + + /** + * Registers the given event listener so that it is notified whenever a cluster topology event occurs + * @param eventListener the event listener to notify + */ + void registerEventListener(ClusterTopologyEventListener eventListener); + + /** + * Stops notifying the given listener when cluster topology events occurs + * @param eventListener the event listener to stop notifying + */ + void unregisterEventListener(ClusterTopologyEventListener eventListener); } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java new file mode 100644 index 0000000..54cc4de --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +public interface ClusterTopologyEventListener { + + void onNodeAdded(NodeIdentifier nodeId); + + void onNodeRemoved(NodeIdentifier nodeId); + + void onLocalNodeIdentifierSet(NodeIdentifier localNodeId); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java index f4475df..b17ec2b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java @@ -16,11 +16,14 @@ */ package org.apache.nifi.cluster.protocol; +import org.apache.commons.lang3.StringUtils; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.lang3.StringUtils; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** * A node identifier denoting the coordinates of a flow controller that is @@ -63,11 +66,21 @@ public class NodeIdentifier { private final String socketAddress; /** - * the port to use use for sending requests to the node's internal interface + * the port to use for sending requests to the node's internal interface */ private final int socketPort; /** + * The IP or hostname to use for sending FlowFiles when load balancing a connection + */ + private final String loadBalanceAddress; + + /** + * the port to use for sending FlowFiles when load balancing a connection + */ + private final int loadBalancePort; + + /** * the IP or hostname that external clients should use to communicate with this node via Site-to-Site */ private final String siteToSiteAddress; @@ -89,15 +102,20 @@ public class NodeIdentifier { private final Boolean siteToSiteSecure; - private final String nodeDn; + private final Set<String> nodeIdentities; public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, + final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure) { + this(id, apiAddress, apiPort, socketAddress, socketPort, socketAddress, 6342, siteToSiteAddress, siteToSitePort, siteToSiteHttpApiPort, siteToSiteSecure, null); + } + + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String loadBalanceAddress, final int loadBalancePort, final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure) { - this(id, apiAddress, apiPort, socketAddress, socketPort, siteToSiteAddress, siteToSitePort, siteToSiteHttpApiPort, siteToSiteSecure, null); + this(id, apiAddress, apiPort, socketAddress, socketPort, loadBalanceAddress, loadBalancePort, siteToSiteAddress, siteToSitePort, siteToSiteHttpApiPort, siteToSiteSecure, null); } - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, - final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure, final String dn) { + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String loadBalanceAddress, final int loadBalancePort, + final String siteToSiteAddress, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, final boolean siteToSiteSecure, final Set<String> nodeIdentities) { if (StringUtils.isBlank(id)) { throw new IllegalArgumentException("Node ID may not be empty or null."); @@ -109,6 +127,7 @@ public class NodeIdentifier { validatePort(apiPort); validatePort(socketPort); + validatePort(loadBalancePort); if (siteToSitePort != null) { validatePort(siteToSitePort); } @@ -118,7 +137,9 @@ public class NodeIdentifier { this.apiPort = apiPort; this.socketAddress = socketAddress; this.socketPort = socketPort; - this.nodeDn = dn; + this.loadBalanceAddress = loadBalanceAddress; + this.loadBalancePort = loadBalancePort; + this.nodeIdentities = nodeIdentities == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(nodeIdentities)); this.siteToSiteAddress = siteToSiteAddress == null ? apiAddress : siteToSiteAddress; this.siteToSitePort = siteToSitePort; this.siteToSiteHttpApiPort = siteToSiteHttpApiPort; @@ -134,7 +155,9 @@ public class NodeIdentifier { this.apiPort = 0; this.socketAddress = null; this.socketPort = 0; - this.nodeDn = null; + this.loadBalanceAddress = null; + this.loadBalancePort = 0; + this.nodeIdentities = Collections.emptySet(); this.siteToSiteAddress = null; this.siteToSitePort = null; this.siteToSiteHttpApiPort = null; @@ -145,8 +168,8 @@ public class NodeIdentifier { return id; } - public String getDN() { - return nodeDn; + public Set<String> getNodeIdentities() { + return nodeIdentities; } public String getApiAddress() { @@ -165,6 +188,14 @@ public class NodeIdentifier { return socketPort; } + public String getLoadBalanceAddress() { + return loadBalanceAddress; + } + + public int getLoadBalancePort() { + return loadBalancePort; + } + private void validatePort(final int port) { if (port < 1 || port > 65535) { throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port); @@ -235,6 +266,12 @@ public class NodeIdentifier { if (this.socketPort != other.socketPort) { return false; } + if (!this.loadBalanceAddress.equals(other.loadBalanceAddress)) { + return false; + } + if (this.loadBalancePort != other.loadBalancePort) { + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java index b2bace9..836ad7a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java @@ -18,6 +18,8 @@ package org.apache.nifi.cluster.protocol; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import java.util.Set; + /** * A handler for processing protocol messages. * @@ -30,11 +32,12 @@ public interface ProtocolHandler { * should be returned. * * @param msg a message + * @param nodeIdentities the set of identities for this node * @return a response or null, if no response is necessary * * @throws ProtocolException if the message could not be processed */ - ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; + ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException; /** * @param msg a message
