NIFI-410 pushed RAT exclusions down to the relevant modules and stopped doing broad test resource exclusion - is now specific
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/761e64a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/761e64a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/761e64a4 Branch: refs/heads/NIFI-250 Commit: 761e64a410a837b4fd756fd730f07621fcea8b37 Parents: eb757a4 Author: joewitt <[email protected]> Authored: Mon Mar 16 22:48:14 2015 -0400 Committer: joewitt <[email protected]> Committed: Mon Mar 16 22:48:14 2015 -0400 ---------------------------------------------------------------------- .../nifi-cluster-protocol/.gitignore | 1 - .../nifi-cluster-protocol/pom.xml | 67 - .../protocol/ClusterManagerProtocolSender.java | 69 - .../cluster/protocol/ConnectionRequest.java | 44 - .../cluster/protocol/ConnectionResponse.java | 141 - .../apache/nifi/cluster/protocol/Heartbeat.java | 68 - .../nifi/cluster/protocol/NodeBulletins.java | 44 - .../nifi/cluster/protocol/NodeIdentifier.java | 172 - .../cluster/protocol/NodeProtocolSender.java | 73 - .../nifi/cluster/protocol/ProtocolContext.java | 39 - .../cluster/protocol/ProtocolException.java | 40 - .../nifi/cluster/protocol/ProtocolHandler.java | 44 - .../nifi/cluster/protocol/ProtocolListener.java | 72 - .../protocol/ProtocolMessageMarshaller.java | 38 - .../protocol/ProtocolMessageUnmarshaller.java | 38 - .../nifi/cluster/protocol/StandardDataFlow.java | 105 - .../UnknownServiceAddressException.java | 39 - .../impl/ClusterManagerProtocolSenderImpl.java | 245 -- .../ClusterManagerProtocolSenderListener.java | 118 - .../protocol/impl/ClusterServiceDiscovery.java | 181 - .../protocol/impl/ClusterServiceLocator.java | 229 -- .../impl/ClusterServicesBroadcaster.java | 182 - .../protocol/impl/CopyingInputStream.java | 77 - .../impl/MulticastProtocolListener.java | 204 - .../protocol/impl/NodeProtocolSenderImpl.java | 171 - .../impl/NodeProtocolSenderListener.java | 115 - .../protocol/impl/SocketProtocolListener.java | 205 - .../protocol/jaxb/JaxbProtocolContext.java | 148 - .../jaxb/message/AdaptedConnectionRequest.java | 40 - .../jaxb/message/AdaptedConnectionResponse.java | 109 - .../protocol/jaxb/message/AdaptedCounter.java | 56 - .../protocol/jaxb/message/AdaptedDataFlow.java | 64 - .../protocol/jaxb/message/AdaptedHeartbeat.java | 66 - .../jaxb/message/AdaptedNodeBulletins.java | 50 - .../jaxb/message/AdaptedNodeIdentifier.java | 76 - .../jaxb/message/ConnectionRequestAdapter.java | 41 - .../jaxb/message/ConnectionResponseAdapter.java | 55 - .../protocol/jaxb/message/DataFlowAdapter.java | 50 - .../protocol/jaxb/message/HeartbeatAdapter.java | 54 - .../jaxb/message/JaxbProtocolUtils.java | 42 - .../jaxb/message/NodeBulletinsAdapter.java | 48 - .../jaxb/message/NodeIdentifierAdapter.java | 51 - .../protocol/jaxb/message/ObjectFactory.java | 104 - .../message/ConnectionRequestMessage.java | 46 - .../message/ConnectionResponseMessage.java | 66 - .../ControllerStartupFailureMessage.java | 49 - .../protocol/message/DisconnectMessage.java | 55 - .../protocol/message/ExceptionMessage.java | 44 - .../protocol/message/FlowRequestMessage.java | 46 - .../protocol/message/FlowResponseMessage.java | 44 - .../protocol/message/HeartbeatMessage.java | 43 - .../message/MulticastProtocolMessage.java | 66 - .../protocol/message/NodeBulletinsMessage.java | 43 - .../cluster/protocol/message/PingMessage.java | 55 - .../message/PrimaryRoleAssignmentMessage.java | 56 - .../protocol/message/ProtocolMessage.java | 61 - .../message/ReconnectionFailureMessage.java | 45 - .../message/ReconnectionRequestMessage.java | 94 - .../message/ReconnectionResponseMessage.java | 32 - .../message/ServiceBroadcastMessage.java | 64 - .../MulticastConfigurationFactoryBean.java | 60 - .../ServerSocketConfigurationFactoryBean.java | 65 - .../spring/SocketConfigurationFactoryBean.java | 66 - .../resources/nifi-cluster-protocol-context.xml | 110 - .../ClusterManagerProtocolSenderImplTest.java | 134 - .../impl/ClusterServiceDiscoveryTest.java | 135 - .../impl/ClusterServiceLocatorTest.java | 121 - .../impl/ClusterServicesBroadcasterTest.java | 133 - .../impl/MulticastProtocolListenerTest.java | 171 - .../impl/NodeProtocolSenderImplTest.java | 203 - .../impl/testutils/DelayedProtocolHandler.java | 57 - .../testutils/ReflexiveProtocolHandler.java | 47 - .../nifi-framework/nifi-cluster-web/.gitignore | 1 - .../nifi-framework/nifi-cluster-web/pom.xml | 48 - .../nifi/cluster/context/ClusterContext.java | 59 - .../cluster/context/ClusterContextImpl.java | 69 - .../context/ClusterContextThreadLocal.java | 47 - .../ClusterAwareOptimisticLockingManager.java | 96 - .../nifi-framework/nifi-cluster/.gitignore | 1 - .../nifi-framework/nifi-cluster/pom.xml | 130 - .../cluster/client/MulticastTestClient.java | 151 - .../org/apache/nifi/cluster/event/Event.java | 122 - .../apache/nifi/cluster/event/EventManager.java | 65 - .../cluster/event/impl/EventManagerImpl.java | 143 - .../cluster/firewall/ClusterNodeFirewall.java | 35 - .../impl/FileBasedClusterNodeFirewall.java | 207 - .../nifi/cluster/flow/ClusterDataFlow.java | 45 - .../apache/nifi/cluster/flow/DaoException.java | 40 - .../apache/nifi/cluster/flow/DataFlowDao.java | 62 - .../cluster/flow/DataFlowManagementService.java | 115 - .../nifi/cluster/flow/PersistedFlowState.java | 37 - .../nifi/cluster/flow/StaleFlowException.java | 42 - .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 600 --- .../impl/DataFlowManagementServiceImpl.java | 356 -- .../nifi/cluster/manager/ClusterManager.java | 225 -- .../cluster/manager/HttpClusterManager.java | 169 - .../cluster/manager/HttpRequestReplicator.java | 99 - .../cluster/manager/HttpResponseMapper.java | 42 - .../nifi/cluster/manager/NodeResponse.java | 329 -- .../exception/BlockedByFirewallException.java | 60 - .../manager/exception/ClusterException.java | 40 - .../ConnectingNodeMutableRequestException.java | 41 - ...DisconnectedNodeMutableRequestException.java | 41 - .../exception/IllegalClusterStateException.java | 41 - .../exception/IllegalNodeDeletionException.java | 41 - .../IllegalNodeDisconnectionException.java | 42 - .../IllegalNodeReconnectionException.java | 41 - .../IneligiblePrimaryNodeException.java | 41 - .../exception/MutableRequestException.java | 42 - .../exception/NoConnectedNodesException.java | 41 - .../exception/NoResponseFromNodesException.java | 42 - .../exception/NodeDisconnectionException.java | 41 - .../exception/NodeReconnectionException.java | 40 - .../PrimaryRoleAssignmentException.java | 41 - .../SafeModeMutableRequestException.java | 41 - .../manager/exception/UnknownNodeException.java | 41 - .../exception/UriConstructionException.java | 42 - .../manager/impl/ClusteredEventAccess.java | 135 - .../manager/impl/ClusteredReportingContext.java | 165 - .../manager/impl/HttpRequestReplicatorImpl.java | 531 --- .../manager/impl/HttpResponseMapperImpl.java | 85 - .../cluster/manager/impl/WebClusterManager.java | 3628 ------------------ .../java/org/apache/nifi/cluster/node/Node.java | 252 -- ...anagerProtocolServiceLocatorFactoryBean.java | 116 - ...FileBasedClusterNodeFirewallFactoryBean.java | 58 - .../spring/WebClusterManagerFactoryBean.java | 139 - .../reporting/ClusteredReportingTaskNode.java | 49 - .../resources/nifi-cluster-manager-context.xml | 124 - .../event/impl/EventManagerImplTest.java | 119 - .../impl/FileBasedClusterNodeFirewallTest.java | 98 - .../impl/DataFlowManagementServiceImplTest.java | 343 -- .../impl/HttpRequestReplicatorImplTest.java | 368 -- .../impl/HttpResponseMapperImplTest.java | 126 - .../manager/impl/TestWebClusterManager.java | 54 - .../cluster/manager/testutils/HttpRequest.java | 239 -- .../cluster/manager/testutils/HttpResponse.java | 93 - .../manager/testutils/HttpResponseAction.java | 60 - .../cluster/manager/testutils/HttpServer.java | 240 -- .../ClusterManagerProtocolSenderImplTest.java | 133 - .../impl/ClusterServiceLocatorTest.java | 119 - .../impl/ClusterServicesBroadcasterTest.java | 131 - .../impl/MulticastProtocolListenerTest.java | 171 - .../impl/NodeProtocolSenderImplTest.java | 201 - .../impl/SocketProtocolListenerTest.java | 132 - .../testutils/DelayedProtocolHandler.java | 57 - .../testutils/ReflexiveProtocolHandler.java | 47 - .../src/test/resources/logback-test.xml | 48 - .../apache/nifi/cluster/firewall/impl/empty.txt | 0 .../apache/nifi/cluster/firewall/impl/ips.txt | 12 - .../nifi-framework-cluster-protocol/.gitignore | 1 + .../nifi-framework-cluster-protocol/pom.xml | 67 + .../protocol/ClusterManagerProtocolSender.java | 69 + .../cluster/protocol/ConnectionRequest.java | 44 + .../cluster/protocol/ConnectionResponse.java | 141 + .../apache/nifi/cluster/protocol/Heartbeat.java | 68 + .../nifi/cluster/protocol/NodeBulletins.java | 44 + .../nifi/cluster/protocol/NodeIdentifier.java | 172 + .../cluster/protocol/NodeProtocolSender.java | 73 + .../nifi/cluster/protocol/ProtocolContext.java | 39 + .../cluster/protocol/ProtocolException.java | 40 + .../nifi/cluster/protocol/ProtocolHandler.java | 44 + .../nifi/cluster/protocol/ProtocolListener.java | 72 + .../protocol/ProtocolMessageMarshaller.java | 38 + .../protocol/ProtocolMessageUnmarshaller.java | 38 + .../nifi/cluster/protocol/StandardDataFlow.java | 105 + .../UnknownServiceAddressException.java | 39 + .../impl/ClusterManagerProtocolSenderImpl.java | 245 ++ .../ClusterManagerProtocolSenderListener.java | 118 + .../protocol/impl/ClusterServiceDiscovery.java | 181 + .../protocol/impl/ClusterServiceLocator.java | 229 ++ .../impl/ClusterServicesBroadcaster.java | 182 + .../protocol/impl/CopyingInputStream.java | 77 + .../impl/MulticastProtocolListener.java | 204 + .../protocol/impl/NodeProtocolSenderImpl.java | 171 + .../impl/NodeProtocolSenderListener.java | 115 + .../protocol/impl/SocketProtocolListener.java | 205 + .../protocol/jaxb/JaxbProtocolContext.java | 148 + .../jaxb/message/AdaptedConnectionRequest.java | 40 + .../jaxb/message/AdaptedConnectionResponse.java | 109 + .../protocol/jaxb/message/AdaptedCounter.java | 56 + .../protocol/jaxb/message/AdaptedDataFlow.java | 64 + .../protocol/jaxb/message/AdaptedHeartbeat.java | 66 + .../jaxb/message/AdaptedNodeBulletins.java | 50 + .../jaxb/message/AdaptedNodeIdentifier.java | 76 + .../jaxb/message/ConnectionRequestAdapter.java | 41 + .../jaxb/message/ConnectionResponseAdapter.java | 55 + .../protocol/jaxb/message/DataFlowAdapter.java | 50 + .../protocol/jaxb/message/HeartbeatAdapter.java | 54 + .../jaxb/message/JaxbProtocolUtils.java | 42 + .../jaxb/message/NodeBulletinsAdapter.java | 48 + .../jaxb/message/NodeIdentifierAdapter.java | 51 + .../protocol/jaxb/message/ObjectFactory.java | 104 + .../message/ConnectionRequestMessage.java | 46 + .../message/ConnectionResponseMessage.java | 66 + .../ControllerStartupFailureMessage.java | 49 + .../protocol/message/DisconnectMessage.java | 55 + .../protocol/message/ExceptionMessage.java | 44 + .../protocol/message/FlowRequestMessage.java | 46 + .../protocol/message/FlowResponseMessage.java | 44 + .../protocol/message/HeartbeatMessage.java | 43 + .../message/MulticastProtocolMessage.java | 66 + .../protocol/message/NodeBulletinsMessage.java | 43 + .../cluster/protocol/message/PingMessage.java | 55 + .../message/PrimaryRoleAssignmentMessage.java | 56 + .../protocol/message/ProtocolMessage.java | 61 + .../message/ReconnectionFailureMessage.java | 45 + .../message/ReconnectionRequestMessage.java | 94 + .../message/ReconnectionResponseMessage.java | 32 + .../message/ServiceBroadcastMessage.java | 64 + .../MulticastConfigurationFactoryBean.java | 60 + .../ServerSocketConfigurationFactoryBean.java | 65 + .../spring/SocketConfigurationFactoryBean.java | 66 + .../resources/nifi-cluster-protocol-context.xml | 110 + .../ClusterManagerProtocolSenderImplTest.java | 134 + .../impl/ClusterServiceDiscoveryTest.java | 135 + .../impl/ClusterServiceLocatorTest.java | 121 + .../impl/ClusterServicesBroadcasterTest.java | 133 + .../impl/MulticastProtocolListenerTest.java | 171 + .../impl/NodeProtocolSenderImplTest.java | 203 + .../impl/testutils/DelayedProtocolHandler.java | 57 + .../testutils/ReflexiveProtocolHandler.java | 47 + .../nifi-framework-cluster-web/.gitignore | 1 + .../nifi-framework-cluster-web/pom.xml | 48 + .../nifi/cluster/context/ClusterContext.java | 59 + .../cluster/context/ClusterContextImpl.java | 69 + .../context/ClusterContextThreadLocal.java | 47 + .../ClusterAwareOptimisticLockingManager.java | 96 + .../nifi-framework-cluster/.gitignore | 1 + .../nifi-framework-cluster/pom.xml | 144 + .../cluster/client/MulticastTestClient.java | 151 + .../org/apache/nifi/cluster/event/Event.java | 122 + .../apache/nifi/cluster/event/EventManager.java | 65 + .../cluster/event/impl/EventManagerImpl.java | 143 + .../cluster/firewall/ClusterNodeFirewall.java | 35 + .../impl/FileBasedClusterNodeFirewall.java | 207 + .../nifi/cluster/flow/ClusterDataFlow.java | 45 + .../apache/nifi/cluster/flow/DaoException.java | 40 + .../apache/nifi/cluster/flow/DataFlowDao.java | 62 + .../cluster/flow/DataFlowManagementService.java | 115 + .../nifi/cluster/flow/PersistedFlowState.java | 37 + .../nifi/cluster/flow/StaleFlowException.java | 42 + .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 600 +++ .../impl/DataFlowManagementServiceImpl.java | 356 ++ .../nifi/cluster/manager/ClusterManager.java | 225 ++ .../cluster/manager/HttpClusterManager.java | 169 + .../cluster/manager/HttpRequestReplicator.java | 99 + .../cluster/manager/HttpResponseMapper.java | 42 + .../nifi/cluster/manager/NodeResponse.java | 329 ++ .../exception/BlockedByFirewallException.java | 60 + .../manager/exception/ClusterException.java | 40 + .../ConnectingNodeMutableRequestException.java | 41 + ...DisconnectedNodeMutableRequestException.java | 41 + .../exception/IllegalClusterStateException.java | 41 + .../exception/IllegalNodeDeletionException.java | 41 + .../IllegalNodeDisconnectionException.java | 42 + .../IllegalNodeReconnectionException.java | 41 + .../IneligiblePrimaryNodeException.java | 41 + .../exception/MutableRequestException.java | 42 + .../exception/NoConnectedNodesException.java | 41 + .../exception/NoResponseFromNodesException.java | 42 + .../exception/NodeDisconnectionException.java | 41 + .../exception/NodeReconnectionException.java | 40 + .../PrimaryRoleAssignmentException.java | 41 + .../SafeModeMutableRequestException.java | 41 + .../manager/exception/UnknownNodeException.java | 41 + .../exception/UriConstructionException.java | 42 + .../manager/impl/ClusteredEventAccess.java | 135 + .../manager/impl/ClusteredReportingContext.java | 165 + .../manager/impl/HttpRequestReplicatorImpl.java | 531 +++ .../manager/impl/HttpResponseMapperImpl.java | 85 + .../cluster/manager/impl/WebClusterManager.java | 3628 ++++++++++++++++++ .../java/org/apache/nifi/cluster/node/Node.java | 252 ++ ...anagerProtocolServiceLocatorFactoryBean.java | 116 + ...FileBasedClusterNodeFirewallFactoryBean.java | 58 + .../spring/WebClusterManagerFactoryBean.java | 139 + .../reporting/ClusteredReportingTaskNode.java | 49 + .../resources/nifi-cluster-manager-context.xml | 124 + .../event/impl/EventManagerImplTest.java | 119 + .../impl/FileBasedClusterNodeFirewallTest.java | 98 + .../impl/DataFlowManagementServiceImplTest.java | 343 ++ .../impl/HttpRequestReplicatorImplTest.java | 368 ++ .../impl/HttpResponseMapperImplTest.java | 126 + .../manager/impl/TestWebClusterManager.java | 54 + .../cluster/manager/testutils/HttpRequest.java | 239 ++ .../cluster/manager/testutils/HttpResponse.java | 93 + .../manager/testutils/HttpResponseAction.java | 60 + .../cluster/manager/testutils/HttpServer.java | 240 ++ .../ClusterManagerProtocolSenderImplTest.java | 133 + .../impl/ClusterServiceLocatorTest.java | 119 + .../impl/ClusterServicesBroadcasterTest.java | 131 + .../impl/MulticastProtocolListenerTest.java | 171 + .../impl/NodeProtocolSenderImplTest.java | 201 + .../impl/SocketProtocolListenerTest.java | 132 + .../testutils/DelayedProtocolHandler.java | 57 + .../testutils/ReflexiveProtocolHandler.java | 47 + .../src/test/resources/logback-test.xml | 48 + .../apache/nifi/cluster/firewall/impl/empty.txt | 0 .../apache/nifi/cluster/firewall/impl/ips.txt | 12 + .../nifi-framework/nifi-framework-core/pom.xml | 19 +- .../nifi-framework/nifi-web/nifi-web-ui/pom.xml | 13 + .../nifi-framework/pom.xml | 6 +- .../nifi-standard-processors/pom.xml | 86 +- nifi/pom.xml | 36 +- 303 files changed, 18400 insertions(+), 18280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/pom.xml deleted file mode 100644 index dc70905..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/pom.xml +++ /dev/null @@ -1,67 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework</artifactId> - <version>0.0.3-incubating-SNAPSHOT</version> - </parent> - <artifactId>nifi-framework-cluster-protocol</artifactId> - <packaging>jar</packaging> - <description>The messaging protocol for clustered NiFi</description> - <dependencies> - <!-- application dependencies --> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-socket-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-core-api</artifactId> - </dependency> - - <!-- spring dependencies --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java deleted file mode 100644 index fa1547f..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * An interface for sending protocol messages from the cluster manager to nodes. - * - * @author unattributed - */ -public interface ClusterManagerProtocolSender { - - /** - * Sends a "flow request" message to a node. - * @param msg a message - * @return the response - * @throws ProtocolException if communication failed - */ - FlowResponseMessage requestFlow(FlowRequestMessage msg) throws ProtocolException; - - /** - * Sends a "reconnection request" message to a node. - * @param msg a message - * @return - * @throws ProtocolException if communication failed - */ - ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException; - - /** - * Sends a "disconnection request" message to a node. - * @param msg a message - * @throws ProtocolException if communication failed - */ - void disconnect(DisconnectMessage msg) throws ProtocolException; - - /** - * Sends an "assign primary role" message to a node. - * @param msg a message - * @throws ProtocolException if communication failed - */ - void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException; - - /** - * Sets the {@link BulletinRepository} that can be used to report bulletins - * @param bulletinRepository - */ - void setBulletinRepository(final BulletinRepository bulletinRepository); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java deleted file mode 100644 index 1b5d007..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter; - -/** - * A node's request to connect to the cluster. The request contains a proposed - * identifier. - * - * @author unattributed - */ -@XmlJavaTypeAdapter(ConnectionRequestAdapter.class) -public class ConnectionRequest { - - private final NodeIdentifier proposedNodeIdentifier; - - public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) { - if(proposedNodeIdentifier == null) { - throw new IllegalArgumentException("Proposed node identifier may not be null."); - } - this.proposedNodeIdentifier = proposedNodeIdentifier; - } - - public NodeIdentifier getProposedNodeIdentifier() { - return proposedNodeIdentifier; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java deleted file mode 100644 index 7a5ff2b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter; - -/** - * The cluster manager's response to a node's connection request. If the manager - * has a current copy of the data flow, then it is returned with a node identifier - * to the node. Otherwise, the manager will provide a "try again in X seconds" - * response to the node in hopes that a current data flow will be available upon - * subsequent requests. - * - * @author unattributed - */ -@XmlJavaTypeAdapter(ConnectionResponseAdapter.class) -public class ConnectionResponse { - - private final boolean blockedByFirewall; - private final int tryLaterSeconds; - private final NodeIdentifier nodeIdentifier; - private final StandardDataFlow dataFlow; - private final boolean primary; - private final Integer managerRemoteInputPort; - private final Boolean managerRemoteCommsSecure; - private final String instanceId; - - private volatile String clusterManagerDN; - - public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, - final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { - if(nodeIdentifier == null) { - throw new IllegalArgumentException("Node identifier may not be empty or null."); - } else if(dataFlow == null) { - throw new IllegalArgumentException("DataFlow may not be null."); - } - this.nodeIdentifier = nodeIdentifier; - this.dataFlow = dataFlow; - this.tryLaterSeconds = 0; - this.blockedByFirewall = false; - this.primary = primary; - this.managerRemoteInputPort = managerRemoteInputPort; - this.managerRemoteCommsSecure = managerRemoteCommsSecure; - this.instanceId = instanceId; - } - - public ConnectionResponse(final int tryLaterSeconds) { - if(tryLaterSeconds <= 0) { - throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds); - } - this.dataFlow = null; - this.nodeIdentifier = null; - this.tryLaterSeconds = tryLaterSeconds; - this.blockedByFirewall = false; - this.primary = false; - this.managerRemoteInputPort = null; - this.managerRemoteCommsSecure = null; - this.instanceId = null; - } - - private ConnectionResponse() { - this.dataFlow = null; - this.nodeIdentifier = null; - this.tryLaterSeconds = 0; - this.blockedByFirewall = true; - this.primary = false; - this.managerRemoteInputPort = null; - this.managerRemoteCommsSecure = null; - this.instanceId = null; - } - - public static ConnectionResponse createBlockedByFirewallResponse() { - return new ConnectionResponse(); - } - - public boolean isPrimary() { - return primary; - } - - public boolean shouldTryLater() { - return tryLaterSeconds > 0; - } - - public boolean isBlockedByFirewall() { - return blockedByFirewall; - } - - public int getTryLaterSeconds() { - return tryLaterSeconds; - } - - public StandardDataFlow getDataFlow() { - return dataFlow; - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public Integer getManagerRemoteInputPort() { - return managerRemoteInputPort; - } - - public Boolean isManagerRemoteCommsSecure() { - return managerRemoteCommsSecure; - } - - public String getInstanceId() { - return instanceId; - } - - public void setClusterManagerDN(final String dn) { - this.clusterManagerDN = dn; - } - - /** - * Returns the DN of the NCM, if it is available or <code>null</code> otherwise. - * - * @return - */ - public String getClusterManagerDN() { - return clusterManagerDN; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java deleted file mode 100644 index 67324a1..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.util.Date; -import javax.xml.bind.annotation.XmlTransient; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter; - -/** - * A heartbeat for indicating the status of a node to the cluster. - * @author unattributed - */ -@XmlJavaTypeAdapter(HeartbeatAdapter.class) -public class Heartbeat { - - private final NodeIdentifier nodeIdentifier; - private final boolean primary; - private final boolean connected; - private final long createdTimestamp; - private final byte[] payload; - - public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) { - if(nodeIdentifier == null) { - throw new IllegalArgumentException("Node Identifier may not be null."); - } - this.nodeIdentifier = nodeIdentifier; - this.primary = primary; - this.connected = connected; - this.payload = payload; - this.createdTimestamp = new Date().getTime(); - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public byte[] getPayload() { - return payload; - } - - public boolean isPrimary() { - return primary; - } - - public boolean isConnected() { - return connected; - } - - @XmlTransient - public long getCreatedTimestamp() { - return createdTimestamp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java deleted file mode 100644 index a120524..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.NodeBulletinsAdapter; - -/** - * - */ -@XmlJavaTypeAdapter(NodeBulletinsAdapter.class) -public class NodeBulletins { - - private final NodeIdentifier nodeIdentifier; - private final byte[] payload; - - public NodeBulletins(NodeIdentifier nodeIdentifier, byte[] payload) { - this.nodeIdentifier = nodeIdentifier; - this.payload = payload; - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public byte[] getPayload() { - return payload; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java deleted file mode 100644 index 1893186..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.commons.lang3.StringUtils; - -/** - * A node identifier denoting the coordinates of a flow controller that is connected - * to a cluster. Nodes provide an external public API interface and an internal private - * interface for communicating with the cluster. - * - * The external API interface and internal protocol each require an IP or hostname - * as well as a port for communicating. - * - * This class overrides hashCode and equals and considers two instances to be - * equal if they have the equal IDs. - * - * @author unattributed - * @Immutable - * @Threadsafe - */ -public class NodeIdentifier { - - /** the unique identifier for the node */ - private final String id; - - /** the IP or hostname to use for sending requests to the node's external interface */ - private final String apiAddress; - - /** the port to use use for sending requests to the node's external interface */ - private final int apiPort; - - /** the IP or hostname to use for sending requests to the node's internal interface */ - private final String socketAddress; - - /** the port to use use for sending requests to the node's internal interface */ - private final int socketPort; - - private final String nodeDn; - - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) { - this(id, apiAddress, apiPort, socketAddress, socketPort, null); - } - - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) { - - if(StringUtils.isBlank(id)) { - throw new IllegalArgumentException("Node ID may not be empty or null."); - } else if(StringUtils.isBlank(apiAddress)) { - throw new IllegalArgumentException("Node API address may not be empty or null."); - } else if(StringUtils.isBlank(socketAddress)) { - throw new IllegalArgumentException("Node socket address may not be empty or null."); - } - - validatePort(apiPort); - validatePort(socketPort); - - this.id = id; - this.apiAddress = apiAddress; - this.apiPort = apiPort; - this.socketAddress = socketAddress; - this.socketPort = socketPort; - this.nodeDn = dn; - } - - public String getId() { - return id; - } - - public String getDN() { - return nodeDn; - } - - public String getApiAddress() { - return apiAddress; - } - - public int getApiPort() { - return apiPort; - } - - public String getSocketAddress() { - return socketAddress; - } - - public int getSocketPort() { - return socketPort; - } - - private void validatePort(final int port) { - if(port < 1 || port > 65535) { - throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port); - } - } - - /** - * Compares the id of two node identifiers for equality. - * - * @param obj a node identifier - * - * @return true if the id is equal; false otherwise - */ - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final NodeIdentifier other = (NodeIdentifier) obj; - if ((this.id == null) ? (other.id != null) : !this.id.equals(other.id)) { - return false; - } - return true; - } - - /** - * Compares API address/port and socket address/port for equality. The - * id is not used for comparison. - * - * @param other a node identifier - * - * @return true if API address/port and socket address/port are equal; false - * otherwise - */ - public boolean logicallyEquals(final NodeIdentifier other) { - if(other == null) { - return false; - } - if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) { - return false; - } - if(this.apiPort != other.apiPort) { - return false; - } - if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) { - return false; - } - if(this.socketPort != other.socketPort) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int hash = 7; - hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0); - return hash; - } - - @Override - public String toString() { - return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort + ']'; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java deleted file mode 100644 index 1edcb91..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; - -/** - * An interface for sending protocol messages from a node to the cluster manager. - * @author unattributed - */ -public interface NodeProtocolSender { - - /** - * Sends a "connection request" message to the cluster manager. - * @param msg a message - * @return the response - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a "heartbeat" message to the cluster manager. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a bulletins message to the cluster manager. - * @param msg - * @throws ProtocolException - * @throws UnknownServiceAddressException - */ - void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the controller was unable start. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the node was unable to reconnect to the cluster - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java deleted file mode 100644 index b614e76..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The context for communicating using the internal cluster protocol. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolContext<T> { - - /** - * Creates a marshaller for serializing protocol messages. - * @return a marshaller - */ - ProtocolMessageMarshaller<T> createMarshaller(); - - /** - * Creates an unmarshaller for deserializing protocol messages. - * @return a unmarshaller - */ - ProtocolMessageUnmarshaller<T> createUnmarshaller(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java deleted file mode 100644 index f11ad84..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The base exception for problems encountered while communicating within the - * cluster. - * @author unattributed - */ -public class ProtocolException extends RuntimeException { - - public ProtocolException() { - } - - public ProtocolException(String msg) { - super(msg); - } - - public ProtocolException(Throwable cause) { - super(cause); - } - - public ProtocolException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java deleted file mode 100644 index 6de87db..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * A handler for processing protocol messages. - * @author unattributed - */ -public interface ProtocolHandler { - - /** - * Handles the given protocol message or throws an exception if it cannot - * handle the message. If no response is needed by the protocol, then null - * should be returned. - * - * @param msg a message - * @return a response or null, if no response is necessary - * - * @throws ProtocolException if the message could not be processed - */ - ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; - - /** - * @param msg - * @return true if the handler can process the given message; false otherwise - */ - boolean canHandle(ProtocolMessage msg); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java deleted file mode 100644 index 32f0f5d..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.reporting.BulletinRepository; - -/** - * Defines the interface for a listener to process protocol messages. - * @author unattributed - */ -public interface ProtocolListener { - - /** - * Starts the instance for listening for messages. Start may only be called - * if the instance is not running. - * @throws java.io.IOException - */ - void start() throws IOException; - - /** - * Stops the instance from listening for messages. Stop may only be called - * if the instance is running. - * @throws java.io.IOException - */ - void stop() throws IOException; - - /** - * @return true if the instance is started; false otherwise. - */ - boolean isRunning(); - - /** - * @return the handlers registered with the listener - */ - Collection<ProtocolHandler> getHandlers(); - - /** - * Registers a handler with the listener. - * @param handler a handler - */ - void addHandler(ProtocolHandler handler); - - /** - * Sets the BulletinRepository that can be used to report bulletins - * @param bulletinRepository - */ - void setBulletinRepository(BulletinRepository bulletinRepository); - - /** - * Unregisters the handler with the listener. - * @param handler a handler - * @return true if the handler was removed; false otherwise - */ - boolean removeHandler(ProtocolHandler handler); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java deleted file mode 100644 index bb436e0..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Defines a marshaller for serializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageMarshaller<T> { - - /** - * Serializes the given message to the given output stream. - * @param msg a message - * @param os an output stream - * @throws IOException if the message could not be serialized to the stream - */ - void marshal(T msg, OutputStream os) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java deleted file mode 100644 index c690e7b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Defines an unmarshaller for deserializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageUnmarshaller<T> { - - /** - * Deserializes a message on the given input stream. - * @param is an input stream - * @return - * @throws IOException if the message could not be deserialized from the stream - */ - T unmarshal(InputStream is) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java deleted file mode 100644 index c2d16fc..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.DataFlow; -import java.io.Serializable; -import java.util.Arrays; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; - -/** - * Represents a dataflow, which includes the raw bytes of the flow.xml and - * whether processors should be started automatically at application startup. - */ -@XmlJavaTypeAdapter(DataFlowAdapter.class) -public class StandardDataFlow implements Serializable, DataFlow { - - private final byte[] flow; - private final byte[] templateBytes; - private final byte[] snippetBytes; - - private boolean autoStartProcessors; - - /** - * Constructs an instance. - * - * @param flow a valid flow as bytes, which cannot be null - * @param templateBytes an XML representation of templates - * @param snippetBytes an XML representation of snippets - * - * @throws NullPointerException if any argument is null - */ - public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { - this.flow = flow; - this.templateBytes = templateBytes; - this.snippetBytes = snippetBytes; - } - - public StandardDataFlow(final DataFlow toCopy) { - this.flow = copy(toCopy.getFlow()); - this.templateBytes = copy(toCopy.getTemplates()); - this.snippetBytes = copy(toCopy.getSnippets()); - this.autoStartProcessors = toCopy.isAutoStartProcessors(); - } - - private static byte[] copy(final byte[] bytes) { - return bytes == null ? null : Arrays.copyOf(bytes, bytes.length); - } - - /** - * @return the raw byte array of the flow - */ - public byte[] getFlow() { - return flow; - } - - /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates() { - return templateBytes; - } - - /** - * @return the raw byte array of the snippets - */ - public byte[] getSnippets() { - return snippetBytes; - } - - /** - * @return true if processors should be automatically started at application - * startup; false otherwise - */ - public boolean isAutoStartProcessors() { - return autoStartProcessors; - } - - /** - * - * Sets the flag to automatically start processors at application startup. - * - * @param autoStartProcessors true if processors should be automatically - * started at application startup; false otherwise - */ - public void setAutoStartProcessors(final boolean autoStartProcessors) { - this.autoStartProcessors = autoStartProcessors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java deleted file mode 100644 index 41c74eb..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * Represents the exceptional case when a service's address is not known. - * @author unattributed - */ -public class UnknownServiceAddressException extends RuntimeException { - - public UnknownServiceAddressException() { - } - - public UnknownServiceAddressException(String msg) { - super(msg); - } - - public UnknownServiceAddressException(Throwable cause) { - super(cause); - } - - public UnknownServiceAddressException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java deleted file mode 100644 index ceb3fcb..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.FormatUtils; - -/** - * A protocol sender for sending protocol messages from the cluster manager to - * nodes. - * - * Connection-type requests (e.g., reconnection, disconnection) by nature of - * starting/stopping flow controllers take longer than other types of protocol - * messages. Therefore, a handshake timeout may be specified to lengthen the - * allowable time for communication with the node. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender { - - - private final ProtocolContext<ProtocolMessage> protocolContext; - private final SocketConfiguration socketConfiguration; - private int handshakeTimeoutSeconds; - private volatile BulletinRepository bulletinRepository; - - public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - if(socketConfiguration == null) { - throw new IllegalArgumentException("Socket configuration may not be null."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - this.socketConfiguration = socketConfiguration; - this.protocolContext = protocolContext; - this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - /** - * Requests the data flow from a node. - * @param msg a message - * @return the message response - * @throws @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), false); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.FLOW_RESPONSE == response.getType()) { - return (FlowResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to reconnect to the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @return the response - * @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.RECONNECTION_RESPONSE == response.getType()) { - return (ReconnectionResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to disconnect from the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void disconnect(final DisconnectMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Assigns the primary role to a node. - * - * @param msg a message - * - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - - private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { - // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout - if(handshakeTimeoutSeconds >= 0) { - socket.setSoTimeout(handshakeTimeoutSeconds * 1000); - } - } - - public SocketConfiguration getSocketConfiguration() { - return socketConfiguration; - } - - public int getHandshakeTimeoutSeconds() { - return handshakeTimeoutSeconds; - } - - public void setHandshakeTimeout(final String handshakeTimeout) { - this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS); - } - - private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) { - return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); - } - - private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) { - try { - // create a socket - final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration); - if ( applyHandshakeTimeout ) { - setConnectionHandshakeTimeoutOnSocket(socket); - } - return socket; - } catch(final IOException ioe) { - throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); - } - } -}
