http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java deleted file mode 100644 index 9a97671..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen; - -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.listen.event.Event; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for - * batching events into a single FlowFile. - * - * @param <E> the type of Event - */ -public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> { - - public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Max Batch Size") - .description( - "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with " - + "the <Message Delimiter> up to this configured maximum number of messages") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1") - .required(true) - .build(); - public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .displayName("Batching Message Delimiter") - .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("\\n") - .required(true) - .build(); - - // it is only the array reference that is volatile - not the contents. - protected volatile byte[] messageDemarcatorBytes; - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(NETWORK_INTF_NAME); - descriptors.add(PORT); - descriptors.add(RECV_BUFFER_SIZE); - descriptors.add(MAX_MESSAGE_QUEUE_SIZE); - descriptors.add(MAX_SOCKET_BUFFER_SIZE); - descriptors.add(CHARSET); - descriptors.add(MAX_BATCH_SIZE); - descriptors.add(MESSAGE_DELIMITER); - descriptors.addAll(getAdditionalProperties()); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.addAll(getAdditionalRelationships()); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - @OnScheduled - public void onScheduled(ProcessContext context) throws IOException { - super.onScheduled(context); - final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - messageDemarcatorBytes = msgDemarcator.getBytes(charset); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); - final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); - - // if the size is 0 then there was nothing to process so return - // we don't need to yield here because we have a long poll in side of getBatches - if (batches.size() == 0) { - return; - } - - final List<E> allEvents = new ArrayList<>(); - - for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) { - FlowFile flowFile = entry.getValue().getFlowFile(); - final List<E> events = entry.getValue().getEvents(); - - if (flowFile.getSize() == 0L || events.size() == 0) { - session.remove(flowFile); - getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); - continue; - } - - final Map<String,String> attributes = getAttributes(entry.getValue()); - flowFile = session.putAllAttributes(flowFile, attributes); - - getLogger().debug("Transferring {} to success", new Object[] {flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("FlowFiles Transferred to Success", 1L, false); - - // the sender and command will be the same for all events based on the batch key - final String transitUri = getTransitUri(entry.getValue()); - session.getProvenanceReporter().receive(flowFile, transitUri); - - allEvents.addAll(events); - } - - // let sub-classes take any additional actions - postProcess(context, session, allEvents); - } - - /** - * Creates the attributes for the FlowFile of the given batch. - * - * @param batch the current batch - * @return the Map of FlowFile attributes - */ - protected abstract Map<String,String> getAttributes(final FlowFileEventBatch batch); - - /** - * Creates the transit uri to be used when reporting a provenance receive event for the given batch. - * - * @param batch the current batch - * @return the transit uri string - */ - protected abstract String getTransitUri(final FlowFileEventBatch batch); - - /** - * Called at the end of onTrigger to allow sub-classes to take post processing action on the events - * - * @param context the current context - * @param session the current session - * @param events the list of all events processed by the current execution of onTrigger - */ - protected void postProcess(ProcessContext context, ProcessSession session, final List<E> events) { - // empty implementation so sub-classes only have to override if necessary - } - - /** - * Batches together up to the batchSize events. Events are grouped together based on a batch key which - * by default is the sender of the event, but can be override by sub-classes. - * - * This method will return when batchSize has been reached, or when no more events are available on the queue. - * - * @param session the current session - * @param totalBatchSize the total number of events to process - * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile - * - * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all - * the batches will be <= batchSize - */ - protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize, - final byte[] messageDemarcatorBytes) { - - final Map<String,FlowFileEventBatch> batches = new HashMap<>(); - for (int i=0; i < totalBatchSize; i++) { - final E event = getMessage(true, true, session); - if (event == null) { - break; - } - - final String batchKey = getBatchKey(event); - FlowFileEventBatch batch = batches.get(batchKey); - - // if we don't have a batch for this key then create a new one - if (batch == null) { - batch = new FlowFileEventBatch(session.create(), new ArrayList<E>()); - batches.put(batchKey, batch); - } - - // add the current event to the batch - batch.getEvents().add(event); - - // append the event's data to the FlowFile, write the demarcator first if not on the first event - final boolean writeDemarcator = (i > 0); - try { - final byte[] rawMessage = event.getData(); - FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - if (writeDemarcator) { - out.write(messageDemarcatorBytes); - } - - out.write(rawMessage); - } - }); - - // update the FlowFile reference in the batch object - batch.setFlowFile(appendedFlowFile); - - } catch (final Exception e) { - getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", - new Object[] {e.getMessage()}, e); - errorEvents.offer(event); - break; - } - } - - return batches; - } - - /** - * @param event an event that was pulled off the queue - * - * @return a key to use for batching events together, by default this uses the sender of the - * event, but sub-classes should override this to batch by something else - */ - protected String getBatchKey(final E event) { - return event.getSender(); - } - - /** - * Wrapper to hold a FlowFile and the events that have been appended to it. - */ - protected final class FlowFileEventBatch { - - private FlowFile flowFile; - private List<E> events; - - public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) { - this.flowFile = flowFile; - this.events = events; - } - - public FlowFile getFlowFile() { - return flowFile; - } - - public List<E> getEvents() { - return events; - } - - public void setFlowFile(FlowFile flowFile) { - this.flowFile = flowFile; - } - } - -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java deleted file mode 100644 index 43d01b8..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen; - -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * An abstract processor to extend from when listening for events over a channel. This processor - * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, in a background - * thread which will end up placing events on a queue to polled by the onTrigger method. Sub-classes - * are responsible for providing the dispatcher implementations. - * - * @param <E> the type of events being produced - */ -public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor { - - - - public static final PropertyDescriptor PORT = new PropertyDescriptor - .Builder().name("Port") - .description("The port to listen on for communication.") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .build(); - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("Specifies the character set of the received data.") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Receive Buffer Size") - .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " + - "incoming messages.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("65507 B") - .required(true) - .build(); - public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Size of Socket Buffer") - .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + - "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + - "the data can be read, and incoming data will be dropped.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); - public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() - .name("Max Size of Message Queue") - .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + - "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + - "memory used by the processor.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .required(true) - .build(); - - // Putting these properties here so sub-classes don't have to redefine them, but they are - // not added to the properties by default since not all processors may need them - - public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max Number of TCP Connections") - .description("The maximum number of concurrent TCP connections to accept.") - .addValidator(StandardValidators.createLongValidator(1, 65535, true)) - .defaultValue("2") - .required(true) - .build(); - - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Messages received successfully will be sent out this relationship.") - .build(); - - public static final int POLL_TIMEOUT_MS = 20; - - protected Set<Relationship> relationships; - protected List<PropertyDescriptor> descriptors; - - protected volatile int port; - protected volatile Charset charset; - protected volatile ChannelDispatcher dispatcher; - protected volatile BlockingQueue<E> events; - protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>(); - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(NETWORK_INTF_NAME); - descriptors.add(PORT); - descriptors.add(RECV_BUFFER_SIZE); - descriptors.add(MAX_MESSAGE_QUEUE_SIZE); - descriptors.add(MAX_SOCKET_BUFFER_SIZE); - descriptors.add(CHARSET); - descriptors.addAll(getAdditionalProperties()); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.addAll(getAdditionalRelationships()); - this.relationships = Collections.unmodifiableSet(relationships); - } - - /** - * Override to provide additional relationships for the processor. - * - * @return a list of relationships - */ - protected List<Relationship> getAdditionalRelationships() { - return Collections.EMPTY_LIST; - } - - /** - * Override to provide additional properties for the processor. - * - * @return a list of properties - */ - protected List<PropertyDescriptor> getAdditionalProperties() { - return Collections.EMPTY_LIST; - } - - @Override - public final Set<Relationship> getRelationships() { - return this.relationships; - } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException { - charset = Charset.forName(context.getProperty(CHARSET).getValue()); - port = context.getProperty(PORT).asInteger(); - events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); - - final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); - final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - - InetAddress nicIPAddress = null; - if (!StringUtils.isEmpty(nicIPAddressStr)) { - NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); - nicIPAddress = netIF.getInetAddresses().nextElement(); - } - - // create the dispatcher and call open() to bind to the given port - dispatcher = createDispatcher(context, events); - dispatcher.open(nicIPAddress, port, maxChannelBufferSize); - - // start a thread to run the dispatcher - final Thread readerThread = new Thread(dispatcher); - readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]"); - readerThread.setDaemon(true); - readerThread.start(); - } - - /** - * @param context the ProcessContext to retrieve property values from - * @return a ChannelDispatcher to handle incoming connections - * - * @throws IOException if unable to listen on the requested port - */ - protected abstract ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<E> events) throws IOException; - - // used for testing to access the random port that was selected - public final int getDispatcherPort() { - return dispatcher == null ? 0 : dispatcher.getPort(); - } - - public int getErrorQueueSize() { - return errorEvents.size(); - } - - public int getQueueSize() { - return events == null ? 0 : events.size(); - } - - @OnUnscheduled - public void onUnscheduled() { - if (dispatcher != null) { - dispatcher.close(); - } - } - - /** - * Creates a pool of ByteBuffers with the given size. - * - * @param poolSize the number of buffers to initialize the pool with - * @param bufferSize the size of each buffer - * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize - */ - protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) { - final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize); - for (int i = 0; i < poolSize; i++) { - bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } - return bufferPool; - } - - /** - * If pollErrorQueue is true, the error queue will be checked first and event will be - * returned from the error queue if available. - * - * If pollErrorQueue is false, or no data is in the error queue, the regular queue is polled. - * - * If longPoll is true, the regular queue will be polled with a short timeout, otherwise it will - * poll with no timeout which will return immediately. - * - * @param longPoll whether or not to poll the main queue with a small timeout - * @param pollErrorQueue whether or not to poll the error queue first - * - * @return an event from one of the queues, or null if none are available - */ - protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { - E event = null; - if (pollErrorQueue) { - event = errorEvents.poll(); - } - - if (event == null) { - try { - if (longPoll) { - event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } else { - event = events.poll(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - } - - if (event != null) { - session.adjustCounter("Messages Received", 1L, false); - } - - return event; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java deleted file mode 100644 index 5e4c639..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.AttributeExpression; - -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; - -/** - * Shared properties. - */ -public class ListenerProperties { - - private static final Set<String> interfaceSet = new HashSet<>(); - - static { - try { - final Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces(); - while (interfaceEnum.hasMoreElements()) { - final NetworkInterface ifc = interfaceEnum.nextElement(); - interfaceSet.add(ifc.getName()); - } - } catch (SocketException e) { - } - } - - public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() - .name("Local Network Interface") - .description("The name of a local network interface to be used to restrict listening to a specific LAN.") - .addValidator(new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - ValidationResult result = new ValidationResult.Builder() - .subject("Local Network Interface").valid(true).input(input).build(); - if (interfaceSet.contains(input.toLowerCase())) { - return result; - } - - String message; - String realValue = input; - try { - if (context.isExpressionLanguagePresent(input)) { - AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); - realValue = ae.evaluate(); - } - - if (interfaceSet.contains(realValue.toLowerCase())) { - return result; - } - - message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); - - } catch (IllegalArgumentException e) { - message = "Not a valid AttributeExpression: " + e.getMessage(); - } - result = new ValidationResult.Builder().subject("Local Network Interface") - .valid(false).input(input).explanation(message).build(); - - return result; - } - }) - .expressionLanguageSupported(true) - .build(); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java deleted file mode 100644 index 5215a21..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import java.nio.channels.SelectionKey; - -/** - * A ChannelDispatcher that handles channels asynchronously. - */ -public interface AsyncChannelDispatcher extends ChannelDispatcher { - - /** - * Informs the dispatcher that the connection for the given key is complete. - * - * @param key a key that was previously selected - */ - void completeConnection(SelectionKey key); - - /** - * Informs the dispatcher that the connection for the given key can be added back for selection. - * - * @param key a key that was previously selected - */ - void addBackForSelection(SelectionKey key); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java deleted file mode 100644 index 444aeb1..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import java.io.IOException; -import java.net.InetAddress; - -/** - * Dispatches handlers for a given channel. - */ -public interface ChannelDispatcher extends Runnable { - - /** - * Opens the dispatcher listening on the given port and attempts to set the - * OS socket buffer to maxBufferSize. - * - * @param nicAddress the local network interface to listen on, if null will listen on the wildcard address - * which means listening on all local network interfaces - * - * @param port the port to listen on - * - * @param maxBufferSize the size to set the OS socket buffer to - * - * @throws IOException if an error occurred listening on the given port - */ - void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException; - - /** - * @return the port being listened to - */ - int getPort(); - - /** - * Closes all listeners and stops all handler threads. - */ - void close(); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java deleted file mode 100644 index 69a1998..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.event.EventQueue; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -/** - * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for - * processing, otherwise the buffer is returned to the buffer pool. - */ -public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> implements ChannelDispatcher { - - private final EventFactory<E> eventFactory; - private final BlockingQueue<ByteBuffer> bufferPool; - private final EventQueue<E> events; - private final ComponentLog logger; - private final String sendingHost; - private final Integer sendingPort; - - private Selector selector; - private DatagramChannel datagramChannel; - private volatile boolean stopped = false; - - public DatagramChannelDispatcher(final EventFactory<E> eventFactory, - final BlockingQueue<ByteBuffer> bufferPool, - final BlockingQueue<E> events, - final ComponentLog logger) { - this(eventFactory, bufferPool, events, logger, null, null); - } - - public DatagramChannelDispatcher(final EventFactory<E> eventFactory, - final BlockingQueue<ByteBuffer> bufferPool, - final BlockingQueue<E> events, - final ComponentLog logger, - final String sendingHost, - final Integer sendingPort) { - this.eventFactory = eventFactory; - this.bufferPool = bufferPool; - this.logger = logger; - this.sendingHost = sendingHost; - this.sendingPort = sendingPort; - this.events = new EventQueue<>(events, logger); - - if (bufferPool == null || bufferPool.size() == 0) { - throw new IllegalArgumentException("A pool of available ByteBuffers is required"); - } - } - - @Override - public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { - stopped = false; - datagramChannel = DatagramChannel.open(); - datagramChannel.configureBlocking(false); - - if (maxBufferSize > 0) { - datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); - final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < maxBufferSize) { - logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " - + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - } - - // we don't have to worry about nicAddress being null here because InetSocketAddress already handles it - datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port)); - - // if a sending host and port were provided then connect to that specific address to only receive - // datagrams from that host/port, otherwise we can receive datagrams from any host/port - if (sendingHost != null && sendingPort != null) { - datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort)); - } - - selector = Selector.open(); - datagramChannel.register(selector, SelectionKey.OP_READ); - } - - @Override - public void run() { - final ByteBuffer buffer = bufferPool.poll(); - while (!stopped) { - try { - int selected = selector.select(); - // if stopped the selector could already be closed which would result in a ClosedSelectorException - if (selected > 0 && !stopped) { - Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - // if stopped we don't want to modify the keys because close() may still be in progress - while (selectorKeys.hasNext() && !stopped) { - SelectionKey key = selectorKeys.next(); - selectorKeys.remove(); - if (!key.isValid()) { - continue; - } - DatagramChannel channel = (DatagramChannel) key.channel(); - SocketAddress socketAddress; - buffer.clear(); - while (!stopped && (socketAddress = channel.receive(buffer)) != null) { - String sender = ""; - if (socketAddress instanceof InetSocketAddress) { - sender = ((InetSocketAddress) socketAddress).getAddress().toString(); - } - - // create a byte array from the buffer - buffer.flip(); - byte bytes[] = new byte[buffer.limit()]; - buffer.get(bytes, 0, buffer.limit()); - - final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender); - final E event = eventFactory.create(bytes, metadata, null); - events.offer(event); - - buffer.clear(); - } - } - } - } catch (InterruptedException e) { - stopped = true; - Thread.currentThread().interrupt(); - } catch (IOException e) { - logger.error("Error reading from DatagramChannel", e); - } - } - - if (buffer != null) { - try { - bufferPool.put(buffer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - @Override - public int getPort() { - return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); - } - - @Override - public void close() { - stopped = true; - if (selector != null) { - selector.wakeup(); - } - IOUtils.closeQuietly(selector); - IOUtils.closeQuietly(datagramChannel); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java deleted file mode 100644 index f2479f1..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import java.nio.ByteBuffer; - -/** - * Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the selector key. - * */ -public class SocketChannelAttachment { - - private final ByteBuffer byteBuffer; - private final SSLSocketChannel sslSocketChannel; - - public SocketChannelAttachment(final ByteBuffer byteBuffer, final SSLSocketChannel sslSocketChannel) { - this.byteBuffer = byteBuffer; - this.sslSocketChannel = sslSocketChannel; - } - - public ByteBuffer getByteBuffer() { - return byteBuffer; - } - - public SSLSocketChannel getSslSocketChannel() { - return sslSocketChannel; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java deleted file mode 100644 index b07fbb9..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.dispatcher; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.security.util.SslContextFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.Channel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Accepts Socket connections on the given port and creates a handler for each connection to - * be executed by a thread pool. - */ -public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements AsyncChannelDispatcher { - - private final EventFactory<E> eventFactory; - private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory; - private final BlockingQueue<ByteBuffer> bufferPool; - private final BlockingQueue<E> events; - private final ComponentLog logger; - private final int maxConnections; - private final SSLContext sslContext; - private final SslContextFactory.ClientAuth clientAuth; - private final Charset charset; - - private ExecutorService executor; - private volatile boolean stopped = false; - private Selector selector; - private final BlockingQueue<SelectionKey> keyQueue; - private final AtomicInteger currentConnections = new AtomicInteger(0); - - public SocketChannelDispatcher(final EventFactory<E> eventFactory, - final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, - final BlockingQueue<E> events, - final ComponentLog logger, - final int maxConnections, - final SSLContext sslContext, - final Charset charset) { - this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset); - } - - public SocketChannelDispatcher(final EventFactory<E> eventFactory, - final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, - final BlockingQueue<E> events, - final ComponentLog logger, - final int maxConnections, - final SSLContext sslContext, - final SslContextFactory.ClientAuth clientAuth, - final Charset charset) { - this.eventFactory = eventFactory; - this.handlerFactory = handlerFactory; - this.bufferPool = bufferPool; - this.events = events; - this.logger = logger; - this.maxConnections = maxConnections; - this.keyQueue = new LinkedBlockingQueue<>(maxConnections); - this.sslContext = sslContext; - this.clientAuth = clientAuth; - this.charset = charset; - - if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { - throw new IllegalArgumentException( - "A pool of available ByteBuffers equal to the maximum number of connections is required"); - } - } - - @Override - public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { - stopped = false; - executor = Executors.newFixedThreadPool(maxConnections); - - final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.configureBlocking(false); - if (maxBufferSize > 0) { - serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); - final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < maxBufferSize) { - logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " - + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - } - - serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port)); - - selector = Selector.open(); - serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); - } - - @Override - public void run() { - while (!stopped) { - try { - int selected = selector.select(); - // if stopped the selector could already be closed which would result in a ClosedSelectorException - if (selected > 0 && !stopped){ - Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - // if stopped we don't want to modify the keys because close() may still be in progress - while (selectorKeys.hasNext() && !stopped) { - SelectionKey key = selectorKeys.next(); - selectorKeys.remove(); - if (!key.isValid()){ - continue; - } - if (key.isAcceptable()) { - // Handle new connections coming in - final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); - final SocketChannel socketChannel = channel.accept(); - // Check for available connections - if (currentConnections.incrementAndGet() > maxConnections){ - currentConnections.decrementAndGet(); - logger.warn("Rejecting connection from {} because max connections has been met", - new Object[]{ socketChannel.getRemoteAddress().toString() }); - IOUtils.closeQuietly(socketChannel); - continue; - } - logger.debug("Accepted incoming connection from {}", - new Object[]{socketChannel.getRemoteAddress().toString()}); - // Set socket to non-blocking, and register with selector - socketChannel.configureBlocking(false); - SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - - // Prepare the byte buffer for the reads, clear it out - ByteBuffer buffer = bufferPool.poll(); - buffer.clear(); - buffer.mark(); - - // If we have an SSLContext then create an SSLEngine for the channel - SSLSocketChannel sslSocketChannel = null; - if (sslContext != null) { - final SSLEngine sslEngine = sslContext.createSSLEngine(); - sslEngine.setUseClientMode(false); - - switch (clientAuth) { - case REQUIRED: - sslEngine.setNeedClientAuth(true); - break; - case WANT: - sslEngine.setWantClientAuth(true); - break; - case NONE: - sslEngine.setNeedClientAuth(false); - sslEngine.setWantClientAuth(false); - break; - } - - sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel); - } - - // Attach the buffer and SSLSocketChannel to the key - SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel); - readKey.attach(attachment); - } else if (key.isReadable()) { - // Clear out the operations the select is interested in until done reading - key.interestOps(0); - // Create a handler based on the protocol and whether an SSLEngine was provided or not - final Runnable handler; - if (sslContext != null) { - handler = handlerFactory.createSSLHandler(key, this, charset, eventFactory, events, logger); - } else { - handler = handlerFactory.createHandler(key, this, charset, eventFactory, events, logger); - } - - // run the handler - executor.execute(handler); - } - } - } - // Add back all idle sockets to the select - SelectionKey key; - while((key = keyQueue.poll()) != null){ - key.interestOps(SelectionKey.OP_READ); - } - } catch (IOException e) { - logger.error("Error accepting connection from SocketChannel", e); - } - } - } - - @Override - public int getPort() { - // Return the port for the key listening for accepts - for(SelectionKey key : selector.keys()){ - if (key.isValid()) { - final Channel channel = key.channel(); - if (channel instanceof ServerSocketChannel) { - return ((ServerSocketChannel)channel).socket().getLocalPort(); - } - } - } - return 0; - } - - @Override - public void close() { - stopped = true; - if (selector != null) { - selector.wakeup(); - } - - if (executor != null) { - executor.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - if (selector != null) { - synchronized (selector.keys()) { - for (SelectionKey key : selector.keys()) { - IOUtils.closeQuietly(key.channel()); - } - } - } - IOUtils.closeQuietly(selector); - } - - @Override - public void completeConnection(SelectionKey key) { - // connection is done. Return the buffer to the pool - SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - try { - bufferPool.put(attachment.getByteBuffer()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - currentConnections.decrementAndGet(); - } - - @Override - public void addBackForSelection(SelectionKey key) { - keyQueue.offer(key); - selector.wakeup(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java deleted file mode 100644 index 83989f8..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.nio.channels.SelectableChannel; - -/** - * An event that was read from a channel. - * - * @param <C> the type of SelectableChannel the event was read from - */ -public interface Event<C extends SelectableChannel> { - - /** - * @return the sending host of the data - */ - String getSender(); - - /** - * @return raw data for this event - */ - byte[] getData(); - - /** - * @return the responder to use for responding to this event, or null - * if responses are not supported - */ - ChannelResponder<C> getResponder(); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java deleted file mode 100644 index 1bd9f0d..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.util.Map; - -/** - * Factory to create instances of a given type of Event. - */ -public interface EventFactory<E extends Event> { - - /** - * The key in the metadata map for the sender. - */ - String SENDER_KEY = "sender"; - - /** - * Creates an event for the given data and metadata. - * - * @param data raw data from a channel - * @param metadata additional metadata - * @param responder a responder for the event with the channel populated - * - * @return an instance of the given type - */ - E create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java deleted file mode 100644 index 54529cf..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import java.util.HashMap; -import java.util.Map; - -/** - * Utility methods for EventFactory. - */ -public class EventFactoryUtil { - - public static Map<String,String> createMapWithSender(final String sender) { - Map<String,String> metadata = new HashMap<>(); - metadata.put(EventFactory.SENDER_KEY, sender); - return metadata; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java deleted file mode 100644 index 35e9ae0..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import org.apache.commons.lang3.Validate; -import org.apache.nifi.logging.ComponentLog; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Wraps a BlockingQueue to centralize logic for offering events across UDP, TCP, and SSL. - * - * @param <E> the type of event - */ -public class EventQueue<E extends Event> { - - /** - * The default number of milliseconds to wait when offering new events to the queue. - */ - public static final long DEFAULT_OFFER_WAIT_MS = 100; - - private final long offerWaitMs; - private final BlockingQueue<E> events; - private final ComponentLog logger; - - public EventQueue(final BlockingQueue<E> events, final ComponentLog logger) { - this(events, DEFAULT_OFFER_WAIT_MS, logger); - } - - public EventQueue(final BlockingQueue<E> events, final long offerWaitMs, final ComponentLog logger) { - this.events = events; - this.offerWaitMs = offerWaitMs; - this.logger = logger; - Validate.notNull(this.events); - Validate.notNull(this.logger); - } - - /** - * Offers the given event to the events queue with a wait time, if the offer fails the event - * is dropped an error is logged. - * - * @param event the event to offer - * @throws InterruptedException if interrupted while waiting to offer - */ - public void offer(final E event) throws InterruptedException { - boolean queued = events.offer(event, offerWaitMs, TimeUnit.MILLISECONDS); - if (!queued) { - logger.error("Internal queue at maximum capacity, could not queue event"); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java deleted file mode 100644 index fa3699e..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.nio.channels.SelectableChannel; - -/** - * Standard implementation of Event. - */ -public class StandardEvent<C extends SelectableChannel> implements Event<C> { - - private final String sender; - private final byte[] data; - private final ChannelResponder<C> responder; - - public StandardEvent(final String sender, final byte[] data, final ChannelResponder<C> responder) { - this.sender = sender; - this.data = data; - this.responder = responder; - } - - @Override - public String getSender() { - return sender; - } - - @Override - public byte[] getData() { - return data; - } - - public ChannelResponder<C> getResponder() { - return responder; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java deleted file mode 100644 index 9ae6161..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.event; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; - -import java.util.Map; - -/** - * EventFactory to create StandardEvent instances. - */ -public class StandardEventFactory implements EventFactory<StandardEvent> { - - @Override - public StandardEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { - String sender = null; - if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { - sender = metadata.get(EventFactory.SENDER_KEY); - } - return new StandardEvent(sender, data, responder); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java deleted file mode 100644 index 84ef062..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventQueue; - -import java.nio.channels.SelectionKey; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Base class for all channel handlers. - */ -public abstract class ChannelHandler<E extends Event, D extends ChannelDispatcher> implements Runnable { - - protected final SelectionKey key; - protected final D dispatcher; - protected final Charset charset; - protected final EventFactory<E> eventFactory; - protected final EventQueue<E> events; - protected final ComponentLog logger; - - public ChannelHandler(final SelectionKey key, - final D dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - this.key = key; - this.dispatcher = dispatcher; - this.charset = charset; - this.eventFactory = eventFactory; - this.logger = logger; - this.events = new EventQueue<E>(events, logger); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java deleted file mode 100644 index 9ca6bdd..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; - -import java.nio.channels.SelectionKey; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Factory that can produce ChannelHandlers for the given type of Event and ChannelDispatcher. - */ -public interface ChannelHandlerFactory<E extends Event, D extends ChannelDispatcher> { - - ChannelHandler<E, D> createHandler(final SelectionKey key, - final D dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger); - - ChannelHandler<E, D> createSSLHandler(final SelectionKey key, - final D dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java deleted file mode 100644 index ef747e1..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -/** - * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. - */ -public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { - - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public SSLSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public void run() { - boolean eof = false; - SSLSocketChannel sslSocketChannel = null; - try { - int bytesRead; - final SocketChannel socketChannel = (SocketChannel) key.channel(); - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - - // get the SSLSocketChannel from the attachment - sslSocketChannel = attachment.getSslSocketChannel(); - - // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] - final ByteBuffer socketBuffer = attachment.getByteBuffer(); - byte[] socketBufferArray = new byte[socketBuffer.limit()]; - - // read until no more data - try { - while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { - processBuffer(sslSocketChannel, socketChannel, bytesRead, socketBufferArray); - logger.debug("bytes read from sslSocketChannel {}", new Object[]{bytesRead}); - } - } catch (SocketTimeoutException ste) { - // SSLSocketChannel will throw this exception when 0 bytes are read and the timeout threshold - // is exceeded, we don't want to close the connection in this case - bytesRead = 0; - } - - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - logger.debug("Reached EOF, closing connection"); - } else { - logger.debug("No more data available, returning for selection"); - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (ClosedChannelException e) { - // ClosedChannelException doesn't have a message so handle it separately from IOException - logger.error("Error reading from channel due to channel being closed", e); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(sslSocketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } - } - } - - /** - * Process the contents of the buffer. Give sub-classes a chance to override this behavior. - * - * @param sslSocketChannel the channel the data was read from - * @param socketChannel the socket channel being wrapped by sslSocketChannel - * @param bytesRead the number of bytes read - * @param buffer the buffer to process - * @throws InterruptedException thrown if interrupted while queuing events - */ - protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, - final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { - final InetAddress sender = socketChannel.socket().getInetAddress(); - - // go through the buffer looking for the end of each message - for (int i = 0; i < bytesRead; i++) { - final byte currByte = buffer[i]; - - // check if at end of a message - if (currByte == getDelimiter()) { - if (currBytes.size() > 0) { - final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); - final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.offer(event); - currBytes.reset(); - } - } else { - currBytes.write(currByte); - } - } - } - - @Override - public byte getDelimiter() { - return TCP_DELIMITER; - } - -}
