http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java index 7a5ff2b..4e06926 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java @@ -21,13 +21,12 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter; /** - * The cluster manager's response to a node's connection request. If the manager - * has a current copy of the data flow, then it is returned with a node identifier - * to the node. Otherwise, the manager will provide a "try again in X seconds" - * response to the node in hopes that a current data flow will be available upon - * subsequent requests. - * - * @author unattributed + * The cluster manager's response to a node's connection request. If the manager + * has a current copy of the data flow, then it is returned with a node + * identifier to the node. Otherwise, the manager will provide a "try again in X + * seconds" response to the node in hopes that a current data flow will be + * available upon subsequent requests. + * */ @XmlJavaTypeAdapter(ConnectionResponseAdapter.class) public class ConnectionResponse { @@ -40,14 +39,14 @@ public class ConnectionResponse { private final Integer managerRemoteInputPort; private final Boolean managerRemoteCommsSecure; private final String instanceId; - + private volatile String clusterManagerDN; - - public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, - final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { - if(nodeIdentifier == null) { + + public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, + final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { + if (nodeIdentifier == null) { throw new IllegalArgumentException("Node identifier may not be empty or null."); - } else if(dataFlow == null) { + } else if (dataFlow == null) { throw new IllegalArgumentException("DataFlow may not be null."); } this.nodeIdentifier = nodeIdentifier; @@ -59,9 +58,9 @@ public class ConnectionResponse { this.managerRemoteCommsSecure = managerRemoteCommsSecure; this.instanceId = instanceId; } - + public ConnectionResponse(final int tryLaterSeconds) { - if(tryLaterSeconds <= 0) { + if (tryLaterSeconds <= 0) { throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds); } this.dataFlow = null; @@ -84,19 +83,19 @@ public class ConnectionResponse { this.managerRemoteCommsSecure = null; this.instanceId = null; } - + public static ConnectionResponse createBlockedByFirewallResponse() { return new ConnectionResponse(); } - + public boolean isPrimary() { return primary; } - + public boolean shouldTryLater() { return tryLaterSeconds > 0; } - + public boolean isBlockedByFirewall() { return blockedByFirewall; } @@ -104,11 +103,11 @@ public class ConnectionResponse { public int getTryLaterSeconds() { return tryLaterSeconds; } - + public StandardDataFlow getDataFlow() { return dataFlow; } - + public NodeIdentifier getNodeIdentifier() { return nodeIdentifier; } @@ -116,23 +115,22 @@ public class ConnectionResponse { public Integer getManagerRemoteInputPort() { return managerRemoteInputPort; } - + public Boolean isManagerRemoteCommsSecure() { return managerRemoteCommsSecure; } - + public String getInstanceId() { return instanceId; } - + public void setClusterManagerDN(final String dn) { this.clusterManagerDN = dn; } - + /** - * Returns the DN of the NCM, if it is available or <code>null</code> otherwise. - * - * @return + * @return the DN of the NCM, if it is available or <code>null</code> + * otherwise */ public String getClusterManagerDN() { return clusterManagerDN;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java index 67324a1..04fb3f0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java @@ -23,44 +23,45 @@ import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter; /** * A heartbeat for indicating the status of a node to the cluster. + * * @author unattributed */ @XmlJavaTypeAdapter(HeartbeatAdapter.class) public class Heartbeat { - + private final NodeIdentifier nodeIdentifier; private final boolean primary; private final boolean connected; private final long createdTimestamp; private final byte[] payload; - + public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) { - if(nodeIdentifier == null) { + if (nodeIdentifier == null) { throw new IllegalArgumentException("Node Identifier may not be null."); - } + } this.nodeIdentifier = nodeIdentifier; this.primary = primary; this.connected = connected; this.payload = payload; this.createdTimestamp = new Date().getTime(); } - + public NodeIdentifier getNodeIdentifier() { return nodeIdentifier; } - + public byte[] getPayload() { return payload; } - + public boolean isPrimary() { return primary; } - + public boolean isConnected() { return connected; } - + @XmlTransient public long getCreatedTimestamp() { return createdTimestamp; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java index a120524..86df107 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java @@ -40,5 +40,5 @@ public class NodeBulletins { public byte[] getPayload() { return payload; } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java index 1893186..4b10be6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java @@ -19,56 +19,68 @@ package org.apache.nifi.cluster.protocol; import org.apache.commons.lang3.StringUtils; /** - * A node identifier denoting the coordinates of a flow controller that is connected - * to a cluster. Nodes provide an external public API interface and an internal private - * interface for communicating with the cluster. - * - * The external API interface and internal protocol each require an IP or hostname - * as well as a port for communicating. - * + * A node identifier denoting the coordinates of a flow controller that is + * connected to a cluster. Nodes provide an external public API interface and an + * internal private interface for communicating with the cluster. + * + * The external API interface and internal protocol each require an IP or + * hostname as well as a port for communicating. + * * This class overrides hashCode and equals and considers two instances to be * equal if they have the equal IDs. - * + * * @author unattributed * @Immutable * @Threadsafe */ public class NodeIdentifier { - - /** the unique identifier for the node */ + + /** + * the unique identifier for the node + */ private final String id; - - /** the IP or hostname to use for sending requests to the node's external interface */ + + /** + * the IP or hostname to use for sending requests to the node's external + * interface + */ private final String apiAddress; - - /** the port to use use for sending requests to the node's external interface */ - private final int apiPort; - - /** the IP or hostname to use for sending requests to the node's internal interface */ + + /** + * the port to use use for sending requests to the node's external interface + */ + private final int apiPort; + + /** + * the IP or hostname to use for sending requests to the node's internal + * interface + */ private final String socketAddress; - - /** the port to use use for sending requests to the node's internal interface */ + + /** + * the port to use use for sending requests to the node's internal interface + */ private final int socketPort; - + private final String nodeDn; public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) { this(id, apiAddress, apiPort, socketAddress, socketPort, null); } - + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) { - - if(StringUtils.isBlank(id)) { + + if (StringUtils.isBlank(id)) { throw new IllegalArgumentException("Node ID may not be empty or null."); - } else if(StringUtils.isBlank(apiAddress)) { + } else if (StringUtils.isBlank(apiAddress)) { throw new IllegalArgumentException("Node API address may not be empty or null."); - } else if(StringUtils.isBlank(socketAddress)) { + } else if (StringUtils.isBlank(socketAddress)) { throw new IllegalArgumentException("Node socket address may not be empty or null."); - } - + } + validatePort(apiPort); validatePort(socketPort); - + this.id = id; this.apiAddress = apiAddress; this.apiPort = apiPort; @@ -80,11 +92,11 @@ public class NodeIdentifier { public String getId() { return id; } - + public String getDN() { return nodeDn; } - + public String getApiAddress() { return apiAddress; } @@ -96,22 +108,22 @@ public class NodeIdentifier { public String getSocketAddress() { return socketAddress; } - + public int getSocketPort() { return socketPort; } - + private void validatePort(final int port) { - if(port < 1 || port > 65535) { + if (port < 1 || port > 65535) { throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port); - } + } } - + /** * Compares the id of two node identifiers for equality. - * + * * @param obj a node identifier - * + * * @return true if the id is equal; false otherwise */ @Override @@ -130,33 +142,33 @@ public class NodeIdentifier { } /** - * Compares API address/port and socket address/port for equality. The - * id is not used for comparison. - * + * Compares API address/port and socket address/port for equality. The id is + * not used for comparison. + * * @param other a node identifier - * + * * @return true if API address/port and socket address/port are equal; false * otherwise */ public boolean logicallyEquals(final NodeIdentifier other) { - if(other == null) { + if (other == null) { return false; } if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) { return false; } - if(this.apiPort != other.apiPort) { + if (this.apiPort != other.apiPort) { return false; } if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) { return false; } - if(this.socketPort != other.socketPort) { + if (this.socketPort != other.socketPort) { return false; } return true; } - + @Override public int hashCode() { int hash = 7; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java index 1edcb91..f3e5df4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java @@ -24,50 +24,61 @@ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; /** - * An interface for sending protocol messages from a node to the cluster manager. - * @author unattributed + * An interface for sending protocol messages from a node to the cluster + * manager. + * */ public interface NodeProtocolSender { - + /** * Sends a "connection request" message to the cluster manager. + * * @param msg a message * @return the response - * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws UnknownServiceAddressException if the cluster manager's address + * is not known * @throws ProtocolException if communication failed */ ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; - + /** * Sends a "heartbeat" message to the cluster manager. + * * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws UnknownServiceAddressException if the cluster manager's address + * is not known * @throws ProtocolException if communication failed */ void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; - + /** * Sends a bulletins message to the cluster manager. - * @param msg - * @throws ProtocolException - * @throws UnknownServiceAddressException + * + * @param msg a message + * @throws ProtocolException pe + * @throws UnknownServiceAddressException ex */ void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; - + /** * Sends a failure notification if the controller was unable start. + * * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws UnknownServiceAddressException if the cluster manager's address + * is not known * @throws ProtocolException if communication failed */ void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - + /** - * Sends a failure notification if the node was unable to reconnect to the cluster + * Sends a failure notification if the node was unable to reconnect to the + * cluster + * * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws UnknownServiceAddressException if the cluster manager's address + * is not known * @throws ProtocolException if communication failed */ void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java index b614e76..11a3912 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java @@ -17,22 +17,24 @@ package org.apache.nifi.cluster.protocol; /** - * The context for communicating using the internal cluster protocol. - * + * The context for communicating using the internal cluster protocol. + * * @param <T> The type of protocol message. - * + * * @author unattributed */ public interface ProtocolContext<T> { - + /** * Creates a marshaller for serializing protocol messages. + * * @return a marshaller */ ProtocolMessageMarshaller<T> createMarshaller(); - + /** * Creates an unmarshaller for deserializing protocol messages. + * * @return a unmarshaller */ ProtocolMessageUnmarshaller<T> createUnmarshaller(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java index f11ad84..b6c3737 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java @@ -19,21 +19,22 @@ package org.apache.nifi.cluster.protocol; /** * The base exception for problems encountered while communicating within the * cluster. + * * @author unattributed */ public class ProtocolException extends RuntimeException { - + public ProtocolException() { } - + public ProtocolException(String msg) { super(msg); } - + public ProtocolException(Throwable cause) { super(cause); } - + public ProtocolException(String msg, Throwable cause) { super(msg, cause); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java index 6de87db..b2bace9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java @@ -20,25 +20,26 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; /** * A handler for processing protocol messages. - * @author unattributed + * */ public interface ProtocolHandler { - + /** * Handles the given protocol message or throws an exception if it cannot - * handle the message. If no response is needed by the protocol, then null + * handle the message. If no response is needed by the protocol, then null * should be returned. - * + * * @param msg a message * @return a response or null, if no response is necessary - * + * * @throws ProtocolException if the message could not be processed */ ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; - + /** - * @param msg - * @return true if the handler can process the given message; false otherwise + * @param msg a message + * @return true if the handler can process the given message; false + * otherwise */ boolean canHandle(ProtocolMessage msg); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java index 32f0f5d..2f35241 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java @@ -23,48 +23,53 @@ import org.apache.nifi.reporting.BulletinRepository; /** * Defines the interface for a listener to process protocol messages. - * @author unattributed + * */ public interface ProtocolListener { - + /** - * Starts the instance for listening for messages. Start may only be called + * Starts the instance for listening for messages. Start may only be called * if the instance is not running. - * @throws java.io.IOException + * + * @throws java.io.IOException ex */ void start() throws IOException; - + /** - * Stops the instance from listening for messages. Stop may only be called + * Stops the instance from listening for messages. Stop may only be called * if the instance is running. - * @throws java.io.IOException + * + * @throws java.io.IOException ex */ void stop() throws IOException; - + /** * @return true if the instance is started; false otherwise. */ boolean isRunning(); - + /** * @return the handlers registered with the listener */ Collection<ProtocolHandler> getHandlers(); - + /** * Registers a handler with the listener. + * * @param handler a handler */ void addHandler(ProtocolHandler handler); - + /** * Sets the BulletinRepository that can be used to report bulletins - * @param bulletinRepository + * + * @param bulletinRepository repo */ void setBulletinRepository(BulletinRepository bulletinRepository); - + /** * Unregisters the handler with the listener. + * * @param handler a handler * @return true if the handler was removed; false otherwise */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java index bb436e0..4e43d4d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java @@ -21,15 +21,16 @@ import java.io.OutputStream; /** * Defines a marshaller for serializing protocol messages. - * + * * @param <T> The type of protocol message. - * + * * @author unattributed */ public interface ProtocolMessageMarshaller<T> { - + /** * Serializes the given message to the given output stream. + * * @param msg a message * @param os an output stream * @throws IOException if the message could not be serialized to the stream http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java index c690e7b..e8910bd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java @@ -21,18 +21,19 @@ import java.io.InputStream; /** * Defines an unmarshaller for deserializing protocol messages. - * + * * @param <T> The type of protocol message. - * - * @author unattributed + * */ public interface ProtocolMessageUnmarshaller<T> { - + /** * Deserializes a message on the given input stream. + * * @param is an input stream - * @return - * @throws IOException if the message could not be deserialized from the stream + * @return deserialized message + * @throws IOException if the message could not be deserialized from the + * stream */ T unmarshal(InputStream is) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java index c2d16fc..0f0ed69 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java @@ -25,25 +25,25 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; /** - * Represents a dataflow, which includes the raw bytes of the flow.xml and + * Represents a dataflow, which includes the raw bytes of the flow.xml and * whether processors should be started automatically at application startup. */ @XmlJavaTypeAdapter(DataFlowAdapter.class) public class StandardDataFlow implements Serializable, DataFlow { - + private final byte[] flow; private final byte[] templateBytes; private final byte[] snippetBytes; private boolean autoStartProcessors; - + /** - * Constructs an instance. - * + * Constructs an instance. + * * @param flow a valid flow as bytes, which cannot be null * @param templateBytes an XML representation of templates * @param snippetBytes an XML representation of snippets - * + * * @throws NullPointerException if any argument is null */ public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { @@ -51,20 +51,20 @@ public class StandardDataFlow implements Serializable, DataFlow { this.templateBytes = templateBytes; this.snippetBytes = snippetBytes; } - + public StandardDataFlow(final DataFlow toCopy) { this.flow = copy(toCopy.getFlow()); this.templateBytes = copy(toCopy.getTemplates()); this.snippetBytes = copy(toCopy.getSnippets()); this.autoStartProcessors = toCopy.isAutoStartProcessors(); } - + private static byte[] copy(final byte[] bytes) { return bytes == null ? null : Arrays.copyOf(bytes, bytes.length); } - + /** - * @return the raw byte array of the flow + * @return the raw byte array of the flow */ public byte[] getFlow() { return flow; @@ -76,26 +76,26 @@ public class StandardDataFlow implements Serializable, DataFlow { public byte[] getTemplates() { return templateBytes; } - + /** * @return the raw byte array of the snippets */ public byte[] getSnippets() { return snippetBytes; } - + /** - * @return true if processors should be automatically started at application - * startup; false otherwise + * @return true if processors should be automatically started at application + * startup; false otherwise */ public boolean isAutoStartProcessors() { return autoStartProcessors; } - + /** - * + * * Sets the flag to automatically start processors at application startup. - * + * * @param autoStartProcessors true if processors should be automatically * started at application startup; false otherwise */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java index 41c74eb..dc22ba0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java @@ -18,21 +18,22 @@ package org.apache.nifi.cluster.protocol; /** * Represents the exceptional case when a service's address is not known. + * * @author unattributed */ public class UnknownServiceAddressException extends RuntimeException { - + public UnknownServiceAddressException() { } - + public UnknownServiceAddressException(String msg) { super(msg); } - + public UnknownServiceAddressException(Throwable cause) { super(cause); } - + public UnknownServiceAddressException(String msg, Throwable cause) { super(msg, cause); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java index ceb3fcb..636a6d3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java @@ -43,34 +43,32 @@ import org.apache.nifi.util.FormatUtils; /** * A protocol sender for sending protocol messages from the cluster manager to - * nodes. - * - * Connection-type requests (e.g., reconnection, disconnection) by nature of - * starting/stopping flow controllers take longer than other types of protocol - * messages. Therefore, a handshake timeout may be specified to lengthen the + * nodes. + * + * Connection-type requests (e.g., reconnection, disconnection) by nature of + * starting/stopping flow controllers take longer than other types of protocol + * messages. Therefore, a handshake timeout may be specified to lengthen the * allowable time for communication with the node. - * - * @author unattributed + * */ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender { - private final ProtocolContext<ProtocolMessage> protocolContext; private final SocketConfiguration socketConfiguration; private int handshakeTimeoutSeconds; private volatile BulletinRepository bulletinRepository; public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - if(socketConfiguration == null) { + if (socketConfiguration == null) { throw new IllegalArgumentException("Socket configuration may not be null."); - } else if(protocolContext == null) { + } else if (protocolContext == null) { throw new IllegalArgumentException("Protocol Context may not be null."); } this.socketConfiguration = socketConfiguration; this.protocolContext = protocolContext; this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured } - + @Override public void setBulletinRepository(final BulletinRepository bulletinRepository) { this.bulletinRepository = bulletinRepository; @@ -78,76 +76,79 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS /** * Requests the data flow from a node. + * * @param msg a message * @return the message response - * @throws @throws ProtocolException if the message failed to be sent or the response was malformed + * @throws ProtocolException if the message failed to be sent or the + * response was malformed */ @Override public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { Socket socket = null; try { - socket = createSocket(msg.getNodeId(), false); - + socket = createSocket(msg.getNodeId(), false); + try { // marshal message to output stream final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); } - + final ProtocolMessage response; try { // unmarshall response and return final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.FLOW_RESPONSE == response.getType()) { + } + + if (MessageType.FLOW_RESPONSE == response.getType()) { return (FlowResponseMessage) response; } else { throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); } - + } finally { SocketUtils.closeQuietly(socket); } } /** - * Requests a node to reconnect to the cluster. The configured value for + * Requests a node to reconnect to the cluster. The configured value for * handshake timeout is applied to the socket before making the request. + * * @param msg a message * @return the response - * @throws ProtocolException if the message failed to be sent or the response was malformed + * @throws ProtocolException if the message failed to be sent or the + * response was malformed */ @Override public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { Socket socket = null; try { - socket = createSocket(msg.getNodeId(), true); + socket = createSocket(msg.getNodeId(), true); // marshal message to output stream try { final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); } - - + final ProtocolMessage response; try { // unmarshall response and return final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.RECONNECTION_RESPONSE == response.getType()) { + } + + if (MessageType.RECONNECTION_RESPONSE == response.getType()) { return (ReconnectionResponseMessage) response; } else { throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); @@ -156,10 +157,11 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS SocketUtils.closeQuietly(socket); } } - + /** - * Requests a node to disconnect from the cluster. The configured value for + * Requests a node to disconnect from the cluster. The configured value for * handshake timeout is applied to the socket before making the request. + * * @param msg a message * @throws ProtocolException if the message failed to be sent */ @@ -167,13 +169,13 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS public void disconnect(final DisconnectMessage msg) throws ProtocolException { Socket socket = null; try { - socket = createSocket(msg.getNodeId(), true); + socket = createSocket(msg.getNodeId(), true); // marshal message to output stream try { final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); } } finally { @@ -183,37 +185,36 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS /** * Assigns the primary role to a node. - * + * * @param msg a message - * + * * @throws ProtocolException if the message failed to be sent */ @Override public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { Socket socket = null; try { - socket = createSocket(msg.getNodeId(), true); + socket = createSocket(msg.getNodeId(), true); try { // marshal message to output stream final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); } } finally { SocketUtils.closeQuietly(socket); } } - - + private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout - if(handshakeTimeoutSeconds >= 0) { + if (handshakeTimeoutSeconds >= 0) { socket.setSoTimeout(handshakeTimeoutSeconds * 1000); - } + } } - + public SocketConfiguration getSocketConfiguration() { return socketConfiguration; } @@ -227,18 +228,18 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS } private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) { - return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); + return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); } - + private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) { - try { + try { // create a socket final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration); - if ( applyHandshakeTimeout ) { - setConnectionHandshakeTimeoutOnSocket(socket); + if (applyHandshakeTimeout) { + setConnectionHandshakeTimeoutOnSocket(socket); } return socket; - } catch(final IOException ioe) { + } catch (final IOException ioe) { throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java index 933e5fa..2837f1a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java @@ -32,21 +32,21 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; /** - * A wrapper class for consolidating a protocol sender and listener for the cluster - * manager. - * + * A wrapper class for consolidating a protocol sender and listener for the + * cluster manager. + * * @author unattributed */ public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener { - + private final ClusterManagerProtocolSender sender; - + private final ProtocolListener listener; - + public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) { - if(sender == null) { + if (sender == null) { throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null."); - } else if(listener == null) { + } else if (listener == null) { throw new IllegalArgumentException("ProtocolListener may not be null."); } this.sender = sender; @@ -55,7 +55,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto @Override public void stop() throws IOException { - if(!isRunning()) { + if (!isRunning()) { throw new IllegalStateException("Instance is already stopped."); } listener.stop(); @@ -63,7 +63,7 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto @Override public void start() throws IOException { - if(isRunning()) { + if (isRunning()) { throw new IllegalStateException("Instance is already started."); } listener.start(); @@ -88,13 +88,13 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto public void addHandler(final ProtocolHandler handler) { listener.addHandler(handler); } - + @Override public void setBulletinRepository(final BulletinRepository bulletinRepository) { listener.setBulletinRepository(bulletinRepository); sender.setBulletinRepository(bulletinRepository); } - + @Override public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { return sender.requestFlow(msg); @@ -109,10 +109,10 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto public void disconnect(DisconnectMessage msg) throws ProtocolException { sender.disconnect(msg); } - + @Override public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { sender.assignPrimaryRole(msg); } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java index 24e51e0..f808c83 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; * discovery. The instance must be stopped before termination of the JVM to * ensure proper resource clean-up. * - * @author unattributed */ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener { @@ -60,7 +59,6 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto */ private DiscoverableService service; - public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress, final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { @@ -162,7 +160,8 @@ public class ClusterServiceDiscovery implements MulticastServiceDiscovery, Proto || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) { service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort())); final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress(); - logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); + logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", + serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); } } return null; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java index bebfde8..64ca7fa 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java @@ -27,39 +27,39 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Implements the ServiceLocator interface for locating the socket address - * of a cluster service. Depending on configuration, the address may be located - * using service discovery. If using service discovery, then the service methods - * must be used for starting and stopping discovery. - * - * Service discovery may be used in conjunction with a fixed port. In this case, - * the service discovery will yield the service IP/host while the fixed port will - * be used for the port. - * + * Implements the ServiceLocator interface for locating the socket address of a + * cluster service. Depending on configuration, the address may be located using + * service discovery. If using service discovery, then the service methods must + * be used for starting and stopping discovery. + * + * Service discovery may be used in conjunction with a fixed port. In this case, + * the service discovery will yield the service IP/host while the fixed port + * will be used for the port. + * * Alternatively, the instance may be configured with exact service location, in - * which case, no service discovery occurs and the caller will always receive the - * configured service. - * + * which case, no service discovery occurs and the caller will always receive + * the configured service. + * * @author unattributed */ public class ClusterServiceLocator implements ServiceDiscovery { - + private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class); - + private final String serviceName; - + private final ClusterServiceDiscovery serviceDiscovery; - + private final DiscoverableService fixedService; private final int fixedServicePort; - + private final AttemptsConfig attemptsConfig = new AttemptsConfig(); - + private final AtomicBoolean running = new AtomicBoolean(false); - + public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) { - if(serviceDiscovery == null) { + if (serviceDiscovery == null) { throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); } this.serviceDiscovery = serviceDiscovery; @@ -67,9 +67,9 @@ public class ClusterServiceLocator implements ServiceDiscovery { this.fixedServicePort = 0; this.serviceName = serviceDiscovery.getServiceName(); } - + public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) { - if(serviceDiscovery == null) { + if (serviceDiscovery == null) { throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); } this.serviceDiscovery = serviceDiscovery; @@ -77,9 +77,9 @@ public class ClusterServiceLocator implements ServiceDiscovery { this.fixedServicePort = fixedServicePort; this.serviceName = serviceDiscovery.getServiceName(); } - + public ClusterServiceLocator(final DiscoverableService fixedService) { - if(fixedService == null) { + if (fixedService == null) { throw new IllegalArgumentException("Service may not be null."); } this.serviceDiscovery = null; @@ -87,30 +87,30 @@ public class ClusterServiceLocator implements ServiceDiscovery { this.fixedServicePort = 0; this.serviceName = fixedService.getServiceName(); } - + @Override public DiscoverableService getService() { - + final int numAttemptsValue; final int secondsBetweenAttempts; - synchronized(this) { + synchronized (this) { numAttemptsValue = attemptsConfig.numAttempts; secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts(); } - + // try for a configured amount of attempts to retrieve the service address - for(int i = 0; i < numAttemptsValue; i++) { + for (int i = 0; i < numAttemptsValue; i++) { - if(fixedService != null) { + if (fixedService != null) { return fixedService; - } else if(serviceDiscovery != null) { - + } else if (serviceDiscovery != null) { + final DiscoverableService discoveredService = serviceDiscovery.getService(); - + // if we received an address - if(discoveredService != null) { + if (discoveredService != null) { // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address - if(fixedServicePort > 0) { + if (fixedServicePort > 0) { // create service using discovered service name and address with fixed service port final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort); final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr); @@ -120,23 +120,23 @@ public class ClusterServiceLocator implements ServiceDiscovery { } } } - + // could not obtain service address, so sleep a bit try { - logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", - serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); + logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", + serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); Thread.sleep(secondsBetweenAttempts * 1000); - } catch(final InterruptedException ie) { + } catch (final InterruptedException ie) { break; } - + } return null; } public boolean isRunning() { - if(serviceDiscovery != null) { + if (serviceDiscovery != null) { return serviceDiscovery.isRunning(); } else { return running.get(); @@ -144,31 +144,31 @@ public class ClusterServiceLocator implements ServiceDiscovery { } public void start() throws IOException { - - if(isRunning()) { + + if (isRunning()) { throw new IllegalStateException("Instance is already started."); } - - if(serviceDiscovery != null) { + + if (serviceDiscovery != null) { serviceDiscovery.start(); } running.set(true); } public void stop() throws IOException { - - if(isRunning() == false) { + + if (isRunning() == false) { throw new IllegalStateException("Instance is already stopped."); } - - if(serviceDiscovery != null) { + + if (serviceDiscovery != null) { serviceDiscovery.stop(); } running.set(false); } - + public synchronized void setAttemptsConfig(final AttemptsConfig config) { - if(config == null) { + if (config == null) { throw new IllegalArgumentException("Attempts configuration may not be null."); } this.attemptsConfig.numAttempts = config.numAttempts; @@ -183,21 +183,21 @@ public class ClusterServiceLocator implements ServiceDiscovery { config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit; return config; } - + public static class AttemptsConfig { - + private int numAttempts = 1; - + private int timeBetweenAttempts = 1; - + private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS; - + public int getNumAttempts() { return numAttempts; } public void setNumAttempts(int numAttempts) { - if(numAttempts <= 0) { + if (numAttempts <= 0) { throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts); } this.numAttempts = numAttempts; @@ -208,9 +208,9 @@ public class ClusterServiceLocator implements ServiceDiscovery { } public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) { - if(timeBetweenAttempts <= 0) { + if (timeBetweenAttempts <= 0) { throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } + } this.timeBetweenAttempsUnit = timeBetweenAttempsUnit; } @@ -219,9 +219,9 @@ public class ClusterServiceLocator implements ServiceDiscovery { } public void setTimeBetweenAttempts(int timeBetweenAttempts) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } + if (timeBetweenAttempts <= 0) { + throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); + } this.timeBetweenAttempts = timeBetweenAttempts; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java index e9e7d5b..3458760 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java @@ -21,7 +21,10 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.InetSocketAddress; import java.net.MulticastSocket; -import java.util.*; +import java.util.Collections; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; @@ -39,75 +42,76 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Broadcasts services used by the clustering software using multicast communication. - * A configurable delay occurs after broadcasting the collection of services. - * + * Broadcasts services used by the clustering software using multicast + * communication. A configurable delay occurs after broadcasting the collection + * of services. + * * The client caller is responsible for starting and stopping the broadcasting. * The instance must be stopped before termination of the JVM to ensure proper * resource clean-up. - * + * * @author unattributed */ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster { - + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class)); - + private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>(); private final InetSocketAddress multicastAddress; - + private final MulticastConfiguration multicastConfiguration; - + private final ProtocolContext<ProtocolMessage> protocolContext; - + private final int broadcastDelayMs; - + private Timer broadcaster; - + private MulticastSocket multicastSocket; - - public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, + + public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, + final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) { - - if(multicastAddress == null) { + + if (multicastAddress == null) { throw new IllegalArgumentException("Multicast address may not be null."); - } else if(multicastAddress.getAddress().isMulticastAddress() == false) { + } else if (multicastAddress.getAddress().isMulticastAddress() == false) { throw new IllegalArgumentException("Multicast group address is not a Class D IP address."); - } else if(protocolContext == null) { + } else if (protocolContext == null) { throw new IllegalArgumentException("Protocol Context may not be null."); - } else if(multicastConfiguration == null) { + } else if (multicastConfiguration == null) { throw new IllegalArgumentException("Multicast configuration may not be null."); } - + this.services.addAll(services); this.multicastAddress = multicastAddress; this.multicastConfiguration = multicastConfiguration; this.protocolContext = protocolContext; this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS); } - + public void start() throws IOException { - if(isRunning()) { + if (isRunning()) { throw new IllegalStateException("Instance is already started."); } - + // setup socket multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration); - + // setup broadcaster broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true); broadcaster.schedule(new TimerTask() { @Override public void run() { - for(final DiscoverableService service : services) { + for (final DiscoverableService service : services) { try { final InetSocketAddress serviceAddress = service.getServiceAddress(); - logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", - service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort())); - + logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", + service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort())); + // create message final ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); msg.setServiceName(service.getServiceName()); @@ -124,37 +128,37 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress); multicastSocket.send(packet); - } catch(final Exception ex) { + } catch (final Exception ex) { logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex); } } } }, 0, broadcastDelayMs); } - + public boolean isRunning() { return (broadcaster != null); } - + public void stop() { - - if(isRunning() == false) { + + if (isRunning() == false) { throw new IllegalStateException("Instance is already stopped."); } - + broadcaster.cancel(); broadcaster = null; // close socket MulticastUtils.closeQuietly(multicastSocket); - + } @Override public int getBroadcastDelayMs() { return broadcastDelayMs; } - + @Override public Set<DiscoverableService> getServices() { return Collections.unmodifiableSet(services); @@ -164,16 +168,16 @@ public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster public InetSocketAddress getMulticastAddress() { return multicastAddress; } - + @Override public boolean addService(final DiscoverableService service) { return services.add(service); } - + @Override public boolean removeService(final String serviceName) { - for(final DiscoverableService service : services) { - if(service.getServiceName().equals(serviceName)) { + for (final DiscoverableService service : services) { + if (service.getServiceName().equals(serviceName)) { return services.remove(service); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java index 680df65..7ac17ab 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; public class CopyingInputStream extends FilterInputStream { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final int maxBytesToCopy; private final InputStream in; @@ -32,45 +33,45 @@ public class CopyingInputStream extends FilterInputStream { this.maxBytesToCopy = maxBytesToCopy; this.in = in; } - + @Override public int read() throws IOException { final int delegateRead = in.read(); - if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) { + if (delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy) { baos.write(delegateRead); } - + return delegateRead; } - + @Override public int read(byte[] b) throws IOException { final int delegateRead = in.read(b); - if ( delegateRead >= 0 ) { + if (delegateRead >= 0) { baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); } - + return delegateRead; } - + @Override public int read(byte[] b, int off, int len) throws IOException { final int delegateRead = in.read(b, off, len); - if ( delegateRead >= 0 ) { + if (delegateRead >= 0) { baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); } - + return delegateRead; } - + public byte[] getBytesRead() { return baos.toByteArray(); } - + public void writeBytes(final OutputStream out) throws IOException { baos.writeTo(out); } - + public int getNumberOfBytesCopied() { return baos.size(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java index d3764b3..90f9124 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java @@ -45,20 +45,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Implements a listener for protocol messages sent over multicast. If a message + * Implements a listener for protocol messages sent over multicast. If a message * is of type MulticastProtocolMessage, then the underlying protocol message is - * passed to the handler. If the receiving handler produces a message response, - * then the message is wrapped with a MulticastProtocolMessage before being - * sent to the originator. - * - * The client caller is responsible for starting and stopping the listener. - * The instance must be stopped before termination of the JVM to ensure proper + * passed to the handler. If the receiving handler produces a message response, + * then the message is wrapped with a MulticastProtocolMessage before being sent + * to the originator. + * + * The client caller is responsible for starting and stopping the listener. The + * instance must be stopped before termination of the JVM to ensure proper * resource clean-up. - * + * * @author unattributed */ public class MulticastProtocolListener extends MulticastListener implements ProtocolListener { - + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class)); // immutable members @@ -74,7 +74,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot final ProtocolContext<ProtocolMessage> protocolContext) { super(numThreads, multicastAddress, configuration); - + if (protocolContext == null) { throw new IllegalArgumentException("Protocol Context may not be null."); } @@ -89,21 +89,21 @@ public class MulticastProtocolListener extends MulticastListener implements Prot @Override public void start() throws IOException { - if(super.isRunning()) { + if (super.isRunning()) { throw new IllegalStateException("Instance is already started."); } - + super.start(); - + } @Override public void stop() throws IOException { - if(super.isRunning() == false) { + if (super.isRunning() == false) { throw new IllegalStateException("Instance is already stopped."); } - + // shutdown listener super.stop(); @@ -116,17 +116,17 @@ public class MulticastProtocolListener extends MulticastListener implements Prot @Override public void addHandler(final ProtocolHandler handler) { - if(handler == null) { + if (handler == null) { throw new NullPointerException("Protocol handler may not be null."); } handlers.add(handler); } - + @Override public boolean removeHandler(final ProtocolHandler handler) { return handlers.remove(handler); } - + @Override public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) { @@ -138,10 +138,10 @@ public class MulticastProtocolListener extends MulticastListener implements Prot // unwrap multicast message, if necessary final ProtocolMessage unwrappedRequest; - if(request instanceof MulticastProtocolMessage) { + if (request instanceof MulticastProtocolMessage) { final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request; // don't process a message we sent - if(listenerId.equals(multicastRequest.getId())) { + if (listenerId.equals(multicastRequest.getId())) { return; } else { unwrappedRequest = multicastRequest.getProtocolMessage(); @@ -149,7 +149,7 @@ public class MulticastProtocolListener extends MulticastListener implements Prot } else { unwrappedRequest = request; } - + // dispatch message to handler ProtocolHandler desiredHandler = null; for (final ProtocolHandler handler : getHandlers()) { @@ -164,28 +164,28 @@ public class MulticastProtocolListener extends MulticastListener implements Prot throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); } else { final ProtocolMessage response = desiredHandler.handle(request); - if(response != null) { + if (response != null) { try { - + // wrap with listener id final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response); - + // marshal message final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); marshaller.marshal(multicastResponse, baos); final byte[] responseBytes = baos.toByteArray(); - + final int maxPacketSizeBytes = getMaxPacketSizeBytes(); - if(responseBytes.length > maxPacketSizeBytes) { - logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + - "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'"); + if (responseBytes.length > maxPacketSizeBytes) { + logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + + "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'"); } - + // create and send packet - final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); + final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); multicastSocket.send(responseDatagram); - + } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe); } @@ -194,8 +194,8 @@ public class MulticastProtocolListener extends MulticastListener implements Prot } catch (final Throwable t) { logger.warn("Failed processing protocol message due to " + t, t); - - if ( bulletinRepository != null ) { + + if (bulletinRepository != null) { final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString()); bulletinRepository.addBulletin(bulletin); }
