http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java new file mode 100644 index 0000000..69a1998 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.EventQueue; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for + * processing, otherwise the buffer is returned to the buffer pool. + */ +public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> implements ChannelDispatcher { + + private final EventFactory<E> eventFactory; + private final BlockingQueue<ByteBuffer> bufferPool; + private final EventQueue<E> events; + private final ComponentLog logger; + private final String sendingHost; + private final Integer sendingPort; + + private Selector selector; + private DatagramChannel datagramChannel; + private volatile boolean stopped = false; + + public DatagramChannelDispatcher(final EventFactory<E> eventFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ComponentLog logger) { + this(eventFactory, bufferPool, events, logger, null, null); + } + + public DatagramChannelDispatcher(final EventFactory<E> eventFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ComponentLog logger, + final String sendingHost, + final Integer sendingPort) { + this.eventFactory = eventFactory; + this.bufferPool = bufferPool; + this.logger = logger; + this.sendingHost = sendingHost; + this.sendingPort = sendingPort; + this.events = new EventQueue<>(events, logger); + + if (bufferPool == null || bufferPool.size() == 0) { + throw new IllegalArgumentException("A pool of available ByteBuffers is required"); + } + } + + @Override + public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { + stopped = false; + datagramChannel = DatagramChannel.open(); + datagramChannel.configureBlocking(false); + + if (maxBufferSize > 0) { + datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + + // we don't have to worry about nicAddress being null here because InetSocketAddress already handles it + datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port)); + + // if a sending host and port were provided then connect to that specific address to only receive + // datagrams from that host/port, otherwise we can receive datagrams from any host/port + if (sendingHost != null && sendingPort != null) { + datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort)); + } + + selector = Selector.open(); + datagramChannel.register(selector, SelectionKey.OP_READ); + } + + @Override + public void run() { + final ByteBuffer buffer = bufferPool.poll(); + while (!stopped) { + try { + int selected = selector.select(); + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped) { + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()) { + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress socketAddress; + buffer.clear(); + while (!stopped && (socketAddress = channel.receive(buffer)) != null) { + String sender = ""; + if (socketAddress instanceof InetSocketAddress) { + sender = ((InetSocketAddress) socketAddress).getAddress().toString(); + } + + // create a byte array from the buffer + buffer.flip(); + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + + final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender); + final E event = eventFactory.create(bytes, metadata, null); + events.offer(event); + + buffer.clear(); + } + } + } + } catch (InterruptedException e) { + stopped = true; + Thread.currentThread().interrupt(); + } catch (IOException e) { + logger.error("Error reading from DatagramChannel", e); + } + } + + if (buffer != null) { + try { + bufferPool.put(buffer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public int getPort() { + return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); + } + + @Override + public void close() { + stopped = true; + if (selector != null) { + selector.wakeup(); + } + IOUtils.closeQuietly(selector); + IOUtils.closeQuietly(datagramChannel); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java new file mode 100644 index 0000000..f2479f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.nio.ByteBuffer; + +/** + * Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the selector key. + * */ +public class SocketChannelAttachment { + + private final ByteBuffer byteBuffer; + private final SSLSocketChannel sslSocketChannel; + + public SocketChannelAttachment(final ByteBuffer byteBuffer, final SSLSocketChannel sslSocketChannel) { + this.byteBuffer = byteBuffer; + this.sslSocketChannel = sslSocketChannel; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public SSLSocketChannel getSslSocketChannel() { + return sslSocketChannel; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java new file mode 100644 index 0000000..b07fbb9 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.security.util.SslContextFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.Channel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Accepts Socket connections on the given port and creates a handler for each connection to + * be executed by a thread pool. + */ +public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements AsyncChannelDispatcher { + + private final EventFactory<E> eventFactory; + private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory; + private final BlockingQueue<ByteBuffer> bufferPool; + private final BlockingQueue<E> events; + private final ComponentLog logger; + private final int maxConnections; + private final SSLContext sslContext; + private final SslContextFactory.ClientAuth clientAuth; + private final Charset charset; + + private ExecutorService executor; + private volatile boolean stopped = false; + private Selector selector; + private final BlockingQueue<SelectionKey> keyQueue; + private final AtomicInteger currentConnections = new AtomicInteger(0); + + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ComponentLog logger, + final int maxConnections, + final SSLContext sslContext, + final Charset charset) { + this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset); + } + + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ComponentLog logger, + final int maxConnections, + final SSLContext sslContext, + final SslContextFactory.ClientAuth clientAuth, + final Charset charset) { + this.eventFactory = eventFactory; + this.handlerFactory = handlerFactory; + this.bufferPool = bufferPool; + this.events = events; + this.logger = logger; + this.maxConnections = maxConnections; + this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.sslContext = sslContext; + this.clientAuth = clientAuth; + this.charset = charset; + + if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { + throw new IllegalArgumentException( + "A pool of available ByteBuffers equal to the maximum number of connections is required"); + } + } + + @Override + public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { + stopped = false; + executor = Executors.newFixedThreadPool(maxConnections); + + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + if (maxBufferSize > 0) { + serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + + serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port)); + + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + } + + @Override + public void run() { + while (!stopped) { + try { + int selected = selector.select(); + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + if (key.isAcceptable()) { + // Handle new connections coming in + final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); + final SocketChannel socketChannel = channel.accept(); + // Check for available connections + if (currentConnections.incrementAndGet() > maxConnections){ + currentConnections.decrementAndGet(); + logger.warn("Rejecting connection from {} because max connections has been met", + new Object[]{ socketChannel.getRemoteAddress().toString() }); + IOUtils.closeQuietly(socketChannel); + continue; + } + logger.debug("Accepted incoming connection from {}", + new Object[]{socketChannel.getRemoteAddress().toString()}); + // Set socket to non-blocking, and register with selector + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + + // Prepare the byte buffer for the reads, clear it out + ByteBuffer buffer = bufferPool.poll(); + buffer.clear(); + buffer.mark(); + + // If we have an SSLContext then create an SSLEngine for the channel + SSLSocketChannel sslSocketChannel = null; + if (sslContext != null) { + final SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(false); + + switch (clientAuth) { + case REQUIRED: + sslEngine.setNeedClientAuth(true); + break; + case WANT: + sslEngine.setWantClientAuth(true); + break; + case NONE: + sslEngine.setNeedClientAuth(false); + sslEngine.setWantClientAuth(false); + break; + } + + sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel); + } + + // Attach the buffer and SSLSocketChannel to the key + SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel); + readKey.attach(attachment); + } else if (key.isReadable()) { + // Clear out the operations the select is interested in until done reading + key.interestOps(0); + // Create a handler based on the protocol and whether an SSLEngine was provided or not + final Runnable handler; + if (sslContext != null) { + handler = handlerFactory.createSSLHandler(key, this, charset, eventFactory, events, logger); + } else { + handler = handlerFactory.createHandler(key, this, charset, eventFactory, events, logger); + } + + // run the handler + executor.execute(handler); + } + } + } + // Add back all idle sockets to the select + SelectionKey key; + while((key = keyQueue.poll()) != null){ + key.interestOps(SelectionKey.OP_READ); + } + } catch (IOException e) { + logger.error("Error accepting connection from SocketChannel", e); + } + } + } + + @Override + public int getPort() { + // Return the port for the key listening for accepts + for(SelectionKey key : selector.keys()){ + if (key.isValid()) { + final Channel channel = key.channel(); + if (channel instanceof ServerSocketChannel) { + return ((ServerSocketChannel)channel).socket().getLocalPort(); + } + } + } + return 0; + } + + @Override + public void close() { + stopped = true; + if (selector != null) { + selector.wakeup(); + } + + if (executor != null) { + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + if (selector != null) { + synchronized (selector.keys()) { + for (SelectionKey key : selector.keys()) { + IOUtils.closeQuietly(key.channel()); + } + } + } + IOUtils.closeQuietly(selector); + } + + @Override + public void completeConnection(SelectionKey key) { + // connection is done. Return the buffer to the pool + SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + try { + bufferPool.put(attachment.getByteBuffer()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + currentConnections.decrementAndGet(); + } + + @Override + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java new file mode 100644 index 0000000..83989f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.nio.channels.SelectableChannel; + +/** + * An event that was read from a channel. + * + * @param <C> the type of SelectableChannel the event was read from + */ +public interface Event<C extends SelectableChannel> { + + /** + * @return the sending host of the data + */ + String getSender(); + + /** + * @return raw data for this event + */ + byte[] getData(); + + /** + * @return the responder to use for responding to this event, or null + * if responses are not supported + */ + ChannelResponder<C> getResponder(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java new file mode 100644 index 0000000..1bd9f0d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.util.Map; + +/** + * Factory to create instances of a given type of Event. + */ +public interface EventFactory<E extends Event> { + + /** + * The key in the metadata map for the sender. + */ + String SENDER_KEY = "sender"; + + /** + * Creates an event for the given data and metadata. + * + * @param data raw data from a channel + * @param metadata additional metadata + * @param responder a responder for the event with the channel populated + * + * @return an instance of the given type + */ + E create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java new file mode 100644 index 0000000..54529cf --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import java.util.HashMap; +import java.util.Map; + +/** + * Utility methods for EventFactory. + */ +public class EventFactoryUtil { + + public static Map<String,String> createMapWithSender(final String sender) { + Map<String,String> metadata = new HashMap<>(); + metadata.put(EventFactory.SENDER_KEY, sender); + return metadata; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java new file mode 100644 index 0000000..35e9ae0 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.logging.ComponentLog; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wraps a BlockingQueue to centralize logic for offering events across UDP, TCP, and SSL. + * + * @param <E> the type of event + */ +public class EventQueue<E extends Event> { + + /** + * The default number of milliseconds to wait when offering new events to the queue. + */ + public static final long DEFAULT_OFFER_WAIT_MS = 100; + + private final long offerWaitMs; + private final BlockingQueue<E> events; + private final ComponentLog logger; + + public EventQueue(final BlockingQueue<E> events, final ComponentLog logger) { + this(events, DEFAULT_OFFER_WAIT_MS, logger); + } + + public EventQueue(final BlockingQueue<E> events, final long offerWaitMs, final ComponentLog logger) { + this.events = events; + this.offerWaitMs = offerWaitMs; + this.logger = logger; + Validate.notNull(this.events); + Validate.notNull(this.logger); + } + + /** + * Offers the given event to the events queue with a wait time, if the offer fails the event + * is dropped an error is logged. + * + * @param event the event to offer + * @throws InterruptedException if interrupted while waiting to offer + */ + public void offer(final E event) throws InterruptedException { + boolean queued = events.offer(event, offerWaitMs, TimeUnit.MILLISECONDS); + if (!queued) { + logger.error("Internal queue at maximum capacity, could not queue event"); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java new file mode 100644 index 0000000..fa3699e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.nio.channels.SelectableChannel; + +/** + * Standard implementation of Event. + */ +public class StandardEvent<C extends SelectableChannel> implements Event<C> { + + private final String sender; + private final byte[] data; + private final ChannelResponder<C> responder; + + public StandardEvent(final String sender, final byte[] data, final ChannelResponder<C> responder) { + this.sender = sender; + this.data = data; + this.responder = responder; + } + + @Override + public String getSender() { + return sender; + } + + @Override + public byte[] getData() { + return data; + } + + public ChannelResponder<C> getResponder() { + return responder; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java new file mode 100644 index 0000000..9ae6161 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.util.Map; + +/** + * EventFactory to create StandardEvent instances. + */ +public class StandardEventFactory implements EventFactory<StandardEvent> { + + @Override + public StandardEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { + String sender = null; + if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { + sender = metadata.get(EventFactory.SENDER_KEY); + } + return new StandardEvent(sender, data, responder); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java new file mode 100644 index 0000000..84ef062 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventQueue; + +import java.nio.channels.SelectionKey; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Base class for all channel handlers. + */ +public abstract class ChannelHandler<E extends Event, D extends ChannelDispatcher> implements Runnable { + + protected final SelectionKey key; + protected final D dispatcher; + protected final Charset charset; + protected final EventFactory<E> eventFactory; + protected final EventQueue<E> events; + protected final ComponentLog logger; + + public ChannelHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + this.key = key; + this.dispatcher = dispatcher; + this.charset = charset; + this.eventFactory = eventFactory; + this.logger = logger; + this.events = new EventQueue<E>(events, logger); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java new file mode 100644 index 0000000..9ca6bdd --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; + +import java.nio.channels.SelectionKey; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Factory that can produce ChannelHandlers for the given type of Event and ChannelDispatcher. + */ +public interface ChannelHandlerFactory<E extends Event, D extends ChannelDispatcher> { + + ChannelHandler<E, D> createHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger); + + ChannelHandler<E, D> createSSLHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java new file mode 100644 index 0000000..ef747e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. + */ +public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { + + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public SSLSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public void run() { + boolean eof = false; + SSLSocketChannel sslSocketChannel = null; + try { + int bytesRead; + final SocketChannel socketChannel = (SocketChannel) key.channel(); + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + + // get the SSLSocketChannel from the attachment + sslSocketChannel = attachment.getSslSocketChannel(); + + // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] + final ByteBuffer socketBuffer = attachment.getByteBuffer(); + byte[] socketBufferArray = new byte[socketBuffer.limit()]; + + // read until no more data + try { + while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { + processBuffer(sslSocketChannel, socketChannel, bytesRead, socketBufferArray); + logger.debug("bytes read from sslSocketChannel {}", new Object[]{bytesRead}); + } + } catch (SocketTimeoutException ste) { + // SSLSocketChannel will throw this exception when 0 bytes are read and the timeout threshold + // is exceeded, we don't want to close the connection in this case + bytesRead = 0; + } + + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; + logger.debug("Reached EOF, closing connection"); + } else { + logger.debug("No more data available, returning for selection"); + } + } catch (ClosedByInterruptException | InterruptedException e) { + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; + } catch (ClosedChannelException e) { + // ClosedChannelException doesn't have a message so handle it separately from IOException + logger.error("Error reading from channel due to channel being closed", e); + // Treat same as closed socket + eof = true; + } catch (IOException e) { + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); + // Treat same as closed socket + eof = true; + } finally { + if(eof == true) { + IOUtils.closeQuietly(sslSocketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } + } + } + + /** + * Process the contents of the buffer. Give sub-classes a chance to override this behavior. + * + * @param sslSocketChannel the channel the data was read from + * @param socketChannel the socket channel being wrapped by sslSocketChannel + * @param bytesRead the number of bytes read + * @param buffer the buffer to process + * @throws InterruptedException thrown if interrupted while queuing events + */ + protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, + final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { + final InetAddress sender = socketChannel.socket().getInetAddress(); + + // go through the buffer looking for the end of each message + for (int i = 0; i < bytesRead; i++) { + final byte currByte = buffer[i]; + + // check if at end of a message + if (currByte == getDelimiter()) { + if (currBytes.size() > 0) { + final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.offer(event); + currBytes.reset(); + } + } else { + currBytes.write(currByte); + } + } + } + + @Override + public byte getDelimiter() { + return TCP_DELIMITER; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java new file mode 100644 index 0000000..07b5dcc --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandler; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Base class for socket channel handlers. + */ +public abstract class SocketChannelHandler<E extends Event<SocketChannel>> extends ChannelHandler<E, AsyncChannelDispatcher> { + + static final byte TCP_DELIMITER = '\n'; + + public SocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + /** + * @return the byte used as the delimiter between messages for the given handler + */ + public abstract byte getDelimiter(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java new file mode 100644 index 0000000..9003f90 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandler; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Default factory for creating socket channel handlers. + */ +public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> { + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + return new StandardSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + return new SSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java new file mode 100644 index 0000000..250168c --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Reads from the given SocketChannel into the provided buffer. If the given delimiter is found, the data + * read up to that point is queued for processing. + */ +public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { + + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public StandardSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ComponentLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public void run() { + boolean eof = false; + SocketChannel socketChannel = null; + + try { + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + final ByteBuffer socketBuffer = attachment.getByteBuffer(); + + // read until the buffer is full + while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { + // prepare byte buffer for reading + socketBuffer.flip(); + // mark the current position as start, in case of partial message read + socketBuffer.mark(); + // process the contents that have been read into the buffer + processBuffer(socketChannel, socketBuffer); + + // Preserve bytes in buffer for next call to run + // NOTE: This code could benefit from the two ByteBuffer read calls to avoid + // this compact for higher throughput + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("bytes read {}", new Object[]{bytesRead}); + } + + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; + logger.debug("Reached EOF, closing connection"); + } else { + logger.debug("No more data available, returning for selection"); + } + } catch (ClosedByInterruptException | InterruptedException e) { + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; + } catch (ClosedChannelException e) { + // ClosedChannelException doesn't have a message so handle it separately from IOException + logger.error("Error reading from channel due to channel being closed", e); + // Treat same as closed socket + eof = true; + } catch (IOException e) { + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); + // Treat same as closed socket + eof = true; + } finally { + if(eof == true) { + IOUtils.closeQuietly(socketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } + } + } + + /** + * Process the contents that have been read into the buffer. Allow sub-classes to override this behavior. + * + * @param socketChannel the channel the data was read from + * @param socketBuffer the buffer the data was read into + * @throws InterruptedException if interrupted when queuing events + */ + protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) throws InterruptedException, IOException { + // get total bytes in buffer + final int total = socketBuffer.remaining(); + final InetAddress sender = socketChannel.socket().getInetAddress(); + + // go through the buffer looking for the end of each message + currBytes.reset(); + for (int i = 0; i < total; i++) { + // NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved + // Pull data out of buffer and cram into byte array + byte currByte = socketBuffer.get(); + + // check if at end of a message + if (currByte == getDelimiter()) { + if (currBytes.size() > 0) { + final SocketChannelResponder response = new SocketChannelResponder(socketChannel); + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.offer(event); + currBytes.reset(); + + // Mark this as the start of the next message + socketBuffer.mark(); + } + } else { + currBytes.write(currByte); + } + } + } + + @Override + public byte getDelimiter() { + return TCP_DELIMITER; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java new file mode 100644 index 0000000..978f3ac --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.util.List; + +/** + * A responder for a given channel. + * + * @param <C> The type of SelectableChannel where the response will be written. + */ +public interface ChannelResponder<C extends SelectableChannel> { + + /** + * @return a SelectableChannel to write the response to + */ + C getChannel(); + + /** + * @return a list of responses to write to the channel + */ + List<ChannelResponse> getResponses(); + + /** + * @param response adds the given response to the list of responses + */ + void addResponse(ChannelResponse response); + + /** + * Writes the responses to the underlying channel. + */ + void respond() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java new file mode 100644 index 0000000..98f0301 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response; + +/** + * A response to send back over channel. + */ +public interface ChannelResponse { + + /** + * @return the bytes that should be written to a channel for this response + */ + byte[] toByteArray(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java new file mode 100644 index 0000000..20102ba --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response.socket; + +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.io.IOException; +import java.nio.channels.SocketChannel; + +/** + * A ChannelResponder for SSLSocketChannels. + */ +public class SSLSocketChannelResponder extends SocketChannelResponder { + + private SSLSocketChannel sslSocketChannel; + + public SSLSocketChannelResponder(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel) { + super(socketChannel); + this.sslSocketChannel = sslSocketChannel; + } + + @Override + public void respond() throws IOException { + for (final ChannelResponse response : responses) { + sslSocketChannel.write(response.toByteArray()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java new file mode 100644 index 0000000..5c20bf0 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response.socket; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A ChannelResponder for SocketChannels. The SocketChannel should first be registered with a selector, + * upon being selected for writing the respond() method should be executed. + */ +public class SocketChannelResponder implements ChannelResponder<SocketChannel> { + + protected final List<ChannelResponse> responses; + protected final SocketChannel socketChannel; + + public SocketChannelResponder(final SocketChannel socketChannel) { + this.responses = new ArrayList<>(); + this.socketChannel = socketChannel; + } + + @Override + public SocketChannel getChannel() { + return socketChannel; + } + + @Override + public List<ChannelResponse> getResponses() { + return Collections.unmodifiableList(responses); + } + + @Override + public void addResponse(ChannelResponse response) { + this.responses.add(response); + } + + @Override + public void respond() throws IOException { + for (final ChannelResponse response : responses) { + final ByteBuffer responseBuffer = ByteBuffer.wrap(response.toByteArray()); + + while (responseBuffer.hasRemaining()) { + socketChannel.write(responseBuffer); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java new file mode 100644 index 0000000..f97f31d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +/** + * Represents a looping process was discontinued. + * When a method throws this exception, its caller should stop processing further inputs and stop immediately. + */ +public class DiscontinuedException extends RuntimeException { + public DiscontinuedException(String message) { + super(message); + } + + public DiscontinuedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java new file mode 100644 index 0000000..c6cf140 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield; + +/** + * Represents general error types and how it should be treated. + */ +public enum ErrorTypes { + + /** + * Procedure setting has to be fixed, otherwise the same error would occur irrelevant to the input. + * In order to NOT call failing process frequently, this should be yielded. + */ + PersistentFailure(ProcessException, Yield), + + /** + * It is unknown whether the error is persistent or temporal, related to the input or not. + */ + UnknownFailure(ProcessException, None), + + /** + * The input will be sent to the failure route for recovery without penalizing. + * Basically, the input should not be sent to the same procedure again unless the issue has been solved. + */ + InvalidInput(Failure, None), + + /** + * The procedure is temporarily unavailable, usually due to the external service unavailability. + * Retrying maybe successful, but it should be yielded for a while. + */ + TemporalFailure(Retry, Yield), + + /** + * The input was not processed successfully due to some temporal error + * related to the specifics of the input. Retrying maybe successful, + * but it should be penalized for a while. + */ + TemporalInputFailure(Retry, Penalize), + + /** + * The input was not ready for being processed. It will be kept in the incoming queue and also be penalized. + */ + Defer(Self, Penalize); + + private final Destination destination; + private final Penalty penalty; + ErrorTypes(Destination destination, Penalty penalty){ + this.destination = destination; + this.penalty = penalty; + } + + public Result result() { + return new Result(destination, penalty); + } + + /** + * Represents the destination of input. + */ + public enum Destination { + ProcessException, Failure, Retry, Self + } + + /** + * Indicating yield or penalize the processing when transfer the input. + */ + public enum Penalty { + Yield, Penalize, None + } + + public Destination destination(){ + return this.destination; + } + + public Penalty penalty(){ + return this.penalty; + } + + /** + * Result represents a result of a procedure. + * ErrorTypes enum contains basic error result patterns. + */ + public static class Result { + private final Destination destination; + private final Penalty penalty; + + public Result(Destination destination, Penalty penalty) { + this.destination = destination; + this.penalty = penalty; + } + + public Destination destination() { + return destination; + } + + public Penalty penalty() { + return penalty; + } + + @Override + public String toString() { + return "Result{" + + "destination=" + destination + + ", penalty=" + penalty + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Result result = (Result) o; + + if (destination != result.destination) return false; + return penalty == result.penalty; + } + + @Override + public int hashCode() { + int result = destination != null ? destination.hashCode() : 0; + result = 31 * result + (penalty != null ? penalty.hashCode() : 0); + return result; + } + } + +}
