http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java deleted file mode 100644 index 1edcb91..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; - -/** - * An interface for sending protocol messages from a node to the cluster manager. - * @author unattributed - */ -public interface NodeProtocolSender { - - /** - * Sends a "connection request" message to the cluster manager. - * @param msg a message - * @return the response - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a "heartbeat" message to the cluster manager. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a bulletins message to the cluster manager. - * @param msg - * @throws ProtocolException - * @throws UnknownServiceAddressException - */ - void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the controller was unable start. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the node was unable to reconnect to the cluster - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java deleted file mode 100644 index b614e76..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The context for communicating using the internal cluster protocol. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolContext<T> { - - /** - * Creates a marshaller for serializing protocol messages. - * @return a marshaller - */ - ProtocolMessageMarshaller<T> createMarshaller(); - - /** - * Creates an unmarshaller for deserializing protocol messages. - * @return a unmarshaller - */ - ProtocolMessageUnmarshaller<T> createUnmarshaller(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java deleted file mode 100644 index f11ad84..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The base exception for problems encountered while communicating within the - * cluster. - * @author unattributed - */ -public class ProtocolException extends RuntimeException { - - public ProtocolException() { - } - - public ProtocolException(String msg) { - super(msg); - } - - public ProtocolException(Throwable cause) { - super(cause); - } - - public ProtocolException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java deleted file mode 100644 index 6de87db..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * A handler for processing protocol messages. - * @author unattributed - */ -public interface ProtocolHandler { - - /** - * Handles the given protocol message or throws an exception if it cannot - * handle the message. If no response is needed by the protocol, then null - * should be returned. - * - * @param msg a message - * @return a response or null, if no response is necessary - * - * @throws ProtocolException if the message could not be processed - */ - ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; - - /** - * @param msg - * @return true if the handler can process the given message; false otherwise - */ - boolean canHandle(ProtocolMessage msg); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java deleted file mode 100644 index 32f0f5d..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.reporting.BulletinRepository; - -/** - * Defines the interface for a listener to process protocol messages. - * @author unattributed - */ -public interface ProtocolListener { - - /** - * Starts the instance for listening for messages. Start may only be called - * if the instance is not running. - * @throws java.io.IOException - */ - void start() throws IOException; - - /** - * Stops the instance from listening for messages. Stop may only be called - * if the instance is running. - * @throws java.io.IOException - */ - void stop() throws IOException; - - /** - * @return true if the instance is started; false otherwise. - */ - boolean isRunning(); - - /** - * @return the handlers registered with the listener - */ - Collection<ProtocolHandler> getHandlers(); - - /** - * Registers a handler with the listener. - * @param handler a handler - */ - void addHandler(ProtocolHandler handler); - - /** - * Sets the BulletinRepository that can be used to report bulletins - * @param bulletinRepository - */ - void setBulletinRepository(BulletinRepository bulletinRepository); - - /** - * Unregisters the handler with the listener. - * @param handler a handler - * @return true if the handler was removed; false otherwise - */ - boolean removeHandler(ProtocolHandler handler); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java deleted file mode 100644 index bb436e0..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Defines a marshaller for serializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageMarshaller<T> { - - /** - * Serializes the given message to the given output stream. - * @param msg a message - * @param os an output stream - * @throws IOException if the message could not be serialized to the stream - */ - void marshal(T msg, OutputStream os) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java deleted file mode 100644 index c690e7b..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Defines an unmarshaller for deserializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageUnmarshaller<T> { - - /** - * Deserializes a message on the given input stream. - * @param is an input stream - * @return - * @throws IOException if the message could not be deserialized from the stream - */ - T unmarshal(InputStream is) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java deleted file mode 100644 index c2d16fc..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.DataFlow; -import java.io.Serializable; -import java.util.Arrays; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; - -/** - * Represents a dataflow, which includes the raw bytes of the flow.xml and - * whether processors should be started automatically at application startup. - */ -@XmlJavaTypeAdapter(DataFlowAdapter.class) -public class StandardDataFlow implements Serializable, DataFlow { - - private final byte[] flow; - private final byte[] templateBytes; - private final byte[] snippetBytes; - - private boolean autoStartProcessors; - - /** - * Constructs an instance. - * - * @param flow a valid flow as bytes, which cannot be null - * @param templateBytes an XML representation of templates - * @param snippetBytes an XML representation of snippets - * - * @throws NullPointerException if any argument is null - */ - public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { - this.flow = flow; - this.templateBytes = templateBytes; - this.snippetBytes = snippetBytes; - } - - public StandardDataFlow(final DataFlow toCopy) { - this.flow = copy(toCopy.getFlow()); - this.templateBytes = copy(toCopy.getTemplates()); - this.snippetBytes = copy(toCopy.getSnippets()); - this.autoStartProcessors = toCopy.isAutoStartProcessors(); - } - - private static byte[] copy(final byte[] bytes) { - return bytes == null ? null : Arrays.copyOf(bytes, bytes.length); - } - - /** - * @return the raw byte array of the flow - */ - public byte[] getFlow() { - return flow; - } - - /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates() { - return templateBytes; - } - - /** - * @return the raw byte array of the snippets - */ - public byte[] getSnippets() { - return snippetBytes; - } - - /** - * @return true if processors should be automatically started at application - * startup; false otherwise - */ - public boolean isAutoStartProcessors() { - return autoStartProcessors; - } - - /** - * - * Sets the flag to automatically start processors at application startup. - * - * @param autoStartProcessors true if processors should be automatically - * started at application startup; false otherwise - */ - public void setAutoStartProcessors(final boolean autoStartProcessors) { - this.autoStartProcessors = autoStartProcessors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java deleted file mode 100644 index 41c74eb..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * Represents the exceptional case when a service's address is not known. - * @author unattributed - */ -public class UnknownServiceAddressException extends RuntimeException { - - public UnknownServiceAddressException() { - } - - public UnknownServiceAddressException(String msg) { - super(msg); - } - - public UnknownServiceAddressException(Throwable cause) { - super(cause); - } - - public UnknownServiceAddressException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java deleted file mode 100644 index ceb3fcb..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.FormatUtils; - -/** - * A protocol sender for sending protocol messages from the cluster manager to - * nodes. - * - * Connection-type requests (e.g., reconnection, disconnection) by nature of - * starting/stopping flow controllers take longer than other types of protocol - * messages. Therefore, a handshake timeout may be specified to lengthen the - * allowable time for communication with the node. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender { - - - private final ProtocolContext<ProtocolMessage> protocolContext; - private final SocketConfiguration socketConfiguration; - private int handshakeTimeoutSeconds; - private volatile BulletinRepository bulletinRepository; - - public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - if(socketConfiguration == null) { - throw new IllegalArgumentException("Socket configuration may not be null."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - this.socketConfiguration = socketConfiguration; - this.protocolContext = protocolContext; - this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - /** - * Requests the data flow from a node. - * @param msg a message - * @return the message response - * @throws @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), false); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.FLOW_RESPONSE == response.getType()) { - return (FlowResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to reconnect to the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @return the response - * @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.RECONNECTION_RESPONSE == response.getType()) { - return (ReconnectionResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to disconnect from the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void disconnect(final DisconnectMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Assigns the primary role to a node. - * - * @param msg a message - * - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - - private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { - // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout - if(handshakeTimeoutSeconds >= 0) { - socket.setSoTimeout(handshakeTimeoutSeconds * 1000); - } - } - - public SocketConfiguration getSocketConfiguration() { - return socketConfiguration; - } - - public int getHandshakeTimeoutSeconds() { - return handshakeTimeoutSeconds; - } - - public void setHandshakeTimeout(final String handshakeTimeout) { - this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS); - } - - private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) { - return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); - } - - private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) { - try { - // create a socket - final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration); - if ( applyHandshakeTimeout ) { - setConnectionHandshakeTimeoutOnSocket(socket); - } - return socket; - } catch(final IOException ioe) { - throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java deleted file mode 100644 index 933e5fa..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * A wrapper class for consolidating a protocol sender and listener for the cluster - * manager. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener { - - private final ClusterManagerProtocolSender sender; - - private final ProtocolListener listener; - - public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) { - if(sender == null) { - throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null."); - } else if(listener == null) { - throw new IllegalArgumentException("ProtocolListener may not be null."); - } - this.sender = sender; - this.listener = listener; - } - - @Override - public void stop() throws IOException { - if(!isRunning()) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - @Override - public void start() throws IOException { - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - listener.start(); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return listener.getHandlers(); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - listener.setBulletinRepository(bulletinRepository); - sender.setBulletinRepository(bulletinRepository); - } - - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - return sender.requestFlow(msg); - } - - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - return sender.requestReconnection(msg); - } - - @Override - public void disconnect(DisconnectMessage msg) throws ProtocolException { - sender.disconnect(msg); - } - - @Override - public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { - sender.assignPrimaryRole(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java deleted file mode 100644 index 24e51e0..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery; -import org.apache.nifi.reporting.BulletinRepository; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation for discovering services by way of "service broadcast" type - * protocol messages over multicast. - * - * The client caller is responsible for starting and stopping the service - * discovery. The instance must be stopped before termination of the JVM to - * ensure proper resource clean-up. - * - * @author unattributed - */ -public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class); - private final String serviceName; - private final MulticastConfiguration multicastConfiguration; - private final MulticastProtocolListener listener; - private volatile BulletinRepository bulletinRepository; - - /* - * guarded by this - */ - private DiscoverableService service; - - - public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - - if (StringUtils.isBlank(serviceName)) { - throw new IllegalArgumentException("Service name may not be null or empty."); - } else if (multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if (multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group must be a Class D address."); - } else if (protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } else if (multicastConfiguration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.serviceName = serviceName; - this.multicastConfiguration = multicastConfiguration; - this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext); - listener.addHandler(new ClusterManagerServiceBroadcastHandler()); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public synchronized DiscoverableService getService() { - return service; - } - - @Override - public InetSocketAddress getMulticastAddress() { - return listener.getMulticastAddress(); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(listener.getHandlers()); - } - - @Override - public void addHandler(ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public boolean removeHandler(ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public void start() throws IOException { - if (isRunning()) { - throw new IllegalStateException("Instance is already running."); - } - listener.start(); - } - - @Override - public void stop() throws IOException { - if (isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - public String getServiceName() { - return serviceName; - } - - public MulticastConfiguration getMulticastConfiguration() { - return multicastConfiguration; - } - - private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler { - - @Override - public boolean canHandle(final ProtocolMessage msg) { - return MessageType.SERVICE_BROADCAST == msg.getType(); - } - - @Override - public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { - synchronized (ClusterServiceDiscovery.this) { - if (canHandle(msg) == false) { - throw new ProtocolException("Handler cannot handle message type: " + msg.getType()); - } else { - final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg; - if (serviceName.equals(broadcastMsg.getServiceName())) { - final DiscoverableService oldService = service; - if (oldService == null - || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false - || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) { - service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort())); - final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress(); - logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); - } - } - return null; - } - } - } - } - - private String prettyPrint(final InetSocketAddress address) { - if (address == null) { - return "0.0.0.0:0"; - } else { - return address.getHostName() + ":" + address.getPort(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java deleted file mode 100644 index bebfde8..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.ServiceDiscovery; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements the ServiceLocator interface for locating the socket address - * of a cluster service. Depending on configuration, the address may be located - * using service discovery. If using service discovery, then the service methods - * must be used for starting and stopping discovery. - * - * Service discovery may be used in conjunction with a fixed port. In this case, - * the service discovery will yield the service IP/host while the fixed port will - * be used for the port. - * - * Alternatively, the instance may be configured with exact service location, in - * which case, no service discovery occurs and the caller will always receive the - * configured service. - * - * @author unattributed - */ -public class ClusterServiceLocator implements ServiceDiscovery { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class); - - private final String serviceName; - - private final ClusterServiceDiscovery serviceDiscovery; - - private final DiscoverableService fixedService; - - private final int fixedServicePort; - - private final AttemptsConfig attemptsConfig = new AttemptsConfig(); - - private final AtomicBoolean running = new AtomicBoolean(false); - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = 0; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = fixedServicePort; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final DiscoverableService fixedService) { - if(fixedService == null) { - throw new IllegalArgumentException("Service may not be null."); - } - this.serviceDiscovery = null; - this.fixedService = fixedService; - this.fixedServicePort = 0; - this.serviceName = fixedService.getServiceName(); - } - - @Override - public DiscoverableService getService() { - - final int numAttemptsValue; - final int secondsBetweenAttempts; - synchronized(this) { - numAttemptsValue = attemptsConfig.numAttempts; - secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts(); - } - - // try for a configured amount of attempts to retrieve the service address - for(int i = 0; i < numAttemptsValue; i++) { - - if(fixedService != null) { - return fixedService; - } else if(serviceDiscovery != null) { - - final DiscoverableService discoveredService = serviceDiscovery.getService(); - - // if we received an address - if(discoveredService != null) { - // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address - if(fixedServicePort > 0) { - // create service using discovered service name and address with fixed service port - final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort); - final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr); - return result; - } else { - return discoveredService; - } - } - } - - // could not obtain service address, so sleep a bit - try { - logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", - serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); - Thread.sleep(secondsBetweenAttempts * 1000); - } catch(final InterruptedException ie) { - break; - } - - } - - return null; - } - - public boolean isRunning() { - if(serviceDiscovery != null) { - return serviceDiscovery.isRunning(); - } else { - return running.get(); - } - } - - public void start() throws IOException { - - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.start(); - } - running.set(true); - } - - public void stop() throws IOException { - - if(isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.stop(); - } - running.set(false); - } - - public synchronized void setAttemptsConfig(final AttemptsConfig config) { - if(config == null) { - throw new IllegalArgumentException("Attempts configuration may not be null."); - } - this.attemptsConfig.numAttempts = config.numAttempts; - this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts; - this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit; - } - - public synchronized AttemptsConfig getAttemptsConfig() { - final AttemptsConfig config = new AttemptsConfig(); - config.numAttempts = this.attemptsConfig.numAttempts; - config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts; - config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit; - return config; - } - - public static class AttemptsConfig { - - private int numAttempts = 1; - - private int timeBetweenAttempts = 1; - - private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS; - - public int getNumAttempts() { - return numAttempts; - } - - public void setNumAttempts(int numAttempts) { - if(numAttempts <= 0) { - throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts); - } - this.numAttempts = numAttempts; - } - - public TimeUnit getTimeBetweenAttemptsUnit() { - return timeBetweenAttempsUnit; - } - - public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempsUnit = timeBetweenAttempsUnit; - } - - public int getTimeBetweenAttempts() { - return timeBetweenAttempts; - } - - public void setTimeBetweenAttempts(int timeBetweenAttempts) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempts = timeBetweenAttempts; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java deleted file mode 100644 index e9e7d5b..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Broadcasts services used by the clustering software using multicast communication. - * A configurable delay occurs after broadcasting the collection of services. - * - * The client caller is responsible for starting and stopping the broadcasting. - * The instance must be stopped before termination of the JVM to ensure proper - * resource clean-up. - * - * @author unattributed - */ -public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class)); - - private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>(); - - private final InetSocketAddress multicastAddress; - - private final MulticastConfiguration multicastConfiguration; - - private final ProtocolContext<ProtocolMessage> protocolContext; - - private final int broadcastDelayMs; - - private Timer broadcaster; - - private MulticastSocket multicastSocket; - - public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, - final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) { - - if(multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if(multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group address is not a Class D IP address."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } else if(multicastConfiguration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.services.addAll(services); - this.multicastAddress = multicastAddress; - this.multicastConfiguration = multicastConfiguration; - this.protocolContext = protocolContext; - this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS); - } - - public void start() throws IOException { - - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - // setup socket - multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration); - - // setup broadcaster - broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true); - broadcaster.schedule(new TimerTask() { - @Override - public void run() { - for(final DiscoverableService service : services) { - try { - - final InetSocketAddress serviceAddress = service.getServiceAddress(); - logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", - service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort())); - - // create message - final ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); - msg.setServiceName(service.getServiceName()); - msg.setAddress(serviceAddress.getHostName()); - msg.setPort(serviceAddress.getPort()); - - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - final byte[] packetBytes = baos.toByteArray(); - - // send message - final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress); - multicastSocket.send(packet); - - } catch(final Exception ex) { - logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex); - } - } - } - }, 0, broadcastDelayMs); - } - - public boolean isRunning() { - return (broadcaster != null); - } - - public void stop() { - - if(isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - broadcaster.cancel(); - broadcaster = null; - - // close socket - MulticastUtils.closeQuietly(multicastSocket); - - } - - @Override - public int getBroadcastDelayMs() { - return broadcastDelayMs; - } - - @Override - public Set<DiscoverableService> getServices() { - return Collections.unmodifiableSet(services); - } - - @Override - public InetSocketAddress getMulticastAddress() { - return multicastAddress; - } - - @Override - public boolean addService(final DiscoverableService service) { - return services.add(service); - } - - @Override - public boolean removeService(final String serviceName) { - for(final DiscoverableService service : services) { - if(service.getServiceName().equals(serviceName)) { - return services.remove(service); - } - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java deleted file mode 100644 index 680df65..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class CopyingInputStream extends FilterInputStream { - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - private final int maxBytesToCopy; - private final InputStream in; - - public CopyingInputStream(final InputStream in, final int maxBytesToCopy) { - super(in); - this.maxBytesToCopy = maxBytesToCopy; - this.in = in; - } - - @Override - public int read() throws IOException { - final int delegateRead = in.read(); - if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) { - baos.write(delegateRead); - } - - return delegateRead; - } - - @Override - public int read(byte[] b) throws IOException { - final int delegateRead = in.read(b); - if ( delegateRead >= 0 ) { - baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); - } - - return delegateRead; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - final int delegateRead = in.read(b, off, len); - if ( delegateRead >= 0 ) { - baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); - } - - return delegateRead; - } - - public byte[] getBytesRead() { - return baos.toByteArray(); - } - - public void writeBytes(final OutputStream out) throws IOException { - baos.writeTo(out); - } - - public int getNumberOfBytesCopied() { - return baos.size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java deleted file mode 100644 index d3764b3..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastListener; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.events.BulletinFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a listener for protocol messages sent over multicast. If a message - * is of type MulticastProtocolMessage, then the underlying protocol message is - * passed to the handler. If the receiving handler produces a message response, - * then the message is wrapped with a MulticastProtocolMessage before being - * sent to the originator. - * - * The client caller is responsible for starting and stopping the listener. - * The instance must be stopped before termination of the JVM to ensure proper - * resource clean-up. - * - * @author unattributed - */ -public class MulticastProtocolListener extends MulticastListener implements ProtocolListener { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class)); - - // immutable members - private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>(); - private final String listenerId = UUID.randomUUID().toString(); - private final ProtocolContext<ProtocolMessage> protocolContext; - private volatile BulletinRepository bulletinRepository; - - public MulticastProtocolListener( - final int numThreads, - final InetSocketAddress multicastAddress, - final MulticastConfiguration configuration, - final ProtocolContext<ProtocolMessage> protocolContext) { - - super(numThreads, multicastAddress, configuration); - - if (protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - this.protocolContext = protocolContext; - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public void start() throws IOException { - - if(super.isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - super.start(); - - } - - @Override - public void stop() throws IOException { - - if(super.isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - // shutdown listener - super.stop(); - - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(handlers); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - if(handler == null) { - throw new NullPointerException("Protocol handler may not be null."); - } - handlers.add(handler); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return handlers.remove(handler); - } - - @Override - public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) { - - try { - - // unmarshall message - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - final ProtocolMessage request = unmarshaller.unmarshal(new ByteArrayInputStream(packet.getData(), 0, packet.getLength())); - - // unwrap multicast message, if necessary - final ProtocolMessage unwrappedRequest; - if(request instanceof MulticastProtocolMessage) { - final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request; - // don't process a message we sent - if(listenerId.equals(multicastRequest.getId())) { - return; - } else { - unwrappedRequest = multicastRequest.getProtocolMessage(); - } - } else { - unwrappedRequest = request; - } - - // dispatch message to handler - ProtocolHandler desiredHandler = null; - for (final ProtocolHandler handler : getHandlers()) { - if (handler.canHandle(unwrappedRequest)) { - desiredHandler = handler; - break; - } - } - - // if no handler found, throw exception; otherwise handle request - if (desiredHandler == null) { - throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); - } else { - final ProtocolMessage response = desiredHandler.handle(request); - if(response != null) { - try { - - // wrap with listener id - final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response); - - // marshal message - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(multicastResponse, baos); - final byte[] responseBytes = baos.toByteArray(); - - final int maxPacketSizeBytes = getMaxPacketSizeBytes(); - if(responseBytes.length > maxPacketSizeBytes) { - logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + - "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'"); - } - - // create and send packet - final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); - multicastSocket.send(responseDatagram); - - } catch (final IOException ioe) { - throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe); - } - } - } - - } catch (final Throwable t) { - logger.warn("Failed processing protocol message due to " + t, t); - - if ( bulletinRepository != null ) { - final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString()); - bulletinRepository.addBulletin(bulletin); - } - } - } -}
