http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java new file mode 100644 index 0000000..27d676a --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; + +import org.apache.nifi.logging.NiFiLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class SocketUtils { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class)); + + public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException { + if (address == null) { + throw new IllegalArgumentException("Socket address may not be null."); + } else if (config == null) { + throw new IllegalArgumentException("Configuration may not be null."); + } + + final Socket socket; + + final SSLContext sslContext; + try { + sslContext = config.createSSLContext(); + } catch (final Exception e) { + throw new IOException("Could not create SSLContext", e); + } + + if (sslContext == null) { + socket = new Socket(address.getHostName(), address.getPort()); + } else { + socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort()); + } + + if (config.getSocketTimeout() != null) { + socket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + socket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + socket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + if (config.getSendBufferSize() != null) { + socket.setSendBufferSize(config.getSendBufferSize()); + } + + if (config.getTrafficClass() != null) { + socket.setTrafficClass(config.getTrafficClass()); + } + + if (config.getKeepAlive() != null) { + socket.setKeepAlive(config.getKeepAlive()); + } + + if (config.getOobInline() != null) { + socket.setOOBInline(config.getOobInline()); + } + + if (config.getTcpNoDelay() != null) { + socket.setTcpNoDelay(config.getTcpNoDelay()); + } + + return socket; + } + + public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) + throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException { + if (config == null) { + throw new NullPointerException("Configuration may not be null."); + } + + final SSLContext sslContext = config.createSSLContext(); + final ServerSocket serverSocket; + if (sslContext == null) { + serverSocket = new ServerSocket(port); + } else { + serverSocket = sslContext.getServerSocketFactory().createServerSocket(port); + ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth()); + } + + if (config.getSocketTimeout() != null) { + serverSocket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + serverSocket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + serverSocket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + return serverSocket; + } + + public static void closeQuietly(final Socket socket) { + if (socket == null) { + return; + } + + try { + try { + // can't shudown input/output individually with secure sockets + if ((socket instanceof SSLSocket) == false) { + if (socket.isInputShutdown() == false) { + socket.shutdownInput(); + } + if (socket.isOutputShutdown() == false) { + socket.shutdownOutput(); + } + } + } finally { + if (socket.isClosed() == false) { + socket.close(); + } + } + } catch (final Exception ex) { + logger.debug("Failed to close socket due to: " + ex, ex); + } + } + + public static void closeQuietly(final ServerSocket serverSocket) { + if (serverSocket == null) { + return; + } + + try { + serverSocket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close server socket due to: " + ex, ex); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java new file mode 100644 index 0000000..fc817e9 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * A service that may be discovered at runtime. A service is defined as having a + * unique case-sensitive service name and a socket address where it is + * available. + * + */ +public interface DiscoverableService { + + /** + * The service's name. Two services are considered equal if they have the + * same case sensitive service name. + * + * @return the service's name + */ + String getServiceName(); + + /** + * @return the service's address + */ + InetSocketAddress getServiceAddress(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java new file mode 100644 index 0000000..3737e95 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; +import org.apache.commons.lang3.StringUtils; + +/** + * A basic implementation of the DiscoverableService interface. To services are + * considered equal if they have the same case-sensitive service name. + * + */ +public class DiscoverableServiceImpl implements DiscoverableService { + + private final String serviceName; + + private final InetSocketAddress serviceAddress; + + public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) { + if (StringUtils.isBlank(serviceName)) { + throw new IllegalArgumentException("Service name may not be null or empty."); + } else if (serviceAddress == null) { + throw new IllegalArgumentException("Service address may not be null."); + } + this.serviceName = serviceName; + this.serviceAddress = serviceAddress; + } + + @Override + public InetSocketAddress getServiceAddress() { + return serviceAddress; + } + + @Override + public String getServiceName() { + return serviceName; + } + + @Override + public String toString() { + return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort()); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof DiscoverableService)) { + return false; + } + final DiscoverableService other = (DiscoverableService) obj; + return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName())); + } + + @Override + public int hashCode() { + int hash = 5; + hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0); + return hash; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java new file mode 100644 index 0000000..d1c2156 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + */ +public final class MulticastConfiguration { + + private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL; + + private Integer socketTimeout; + + private Integer receiveBufferSize; + + private Integer sendBufferSize; + + private Boolean reuseAddress; + + private Integer trafficClass; + + private Boolean loopbackMode; + + public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET; + + public MulticastTimeToLive getTtl() { + return ttl; + } + + public void setTtl(final MulticastTimeToLive ttl) { + if (ttl == null) { + throw new NullPointerException("Multicast TTL may not be null."); + } + this.ttl = ttl; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public Integer getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(Integer sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public Integer getTrafficClass() { + return trafficClass; + } + + public void setTrafficClass(Integer trafficClass) { + this.trafficClass = trafficClass; + } + + public Boolean getLoopbackMode() { + return loopbackMode; + } + + public void setLoopbackMode(Boolean loopbackMode) { + this.loopbackMode = loopbackMode; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java new file mode 100644 index 0000000..1ce2ea0 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for protocol messages sent over multicast. If a message + * is of type MulticastProtocolMessage, then the underlying protocol message is + * passed to the handler. If the receiving handler produces a message response, + * then the message is wrapped with a MulticastProtocolMessage before being sent + * to the originator. + * + */ +public abstract class MulticastListener { + + // constants + private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; + private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512; + + private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class)); + + // immutable members + private final int numThreads; + private final InetSocketAddress multicastAddress; + private final MulticastConfiguration configuration; + + private volatile ExecutorService executorService; // volatile to guarantee most current value is visible + private volatile MulticastSocket multicastSocket; // volatile to guarantee most current value is visible + + private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS; + private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES; + + public MulticastListener( + final int numThreads, + final InetSocketAddress multicastAddress, + final MulticastConfiguration configuration) { + + if (numThreads <= 0) { + throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); + } else if (multicastAddress == null) { + throw new IllegalArgumentException("Multicast address may not be null."); + } else if (multicastAddress.getAddress().isMulticastAddress() == false) { + throw new IllegalArgumentException("Multicast group must be a Class D address."); + } else if (configuration == null) { + throw new IllegalArgumentException("Multicast configuration may not be null."); + } + + this.numThreads = numThreads; + this.multicastAddress = multicastAddress; + this.configuration = configuration; + } + + /** + * Implements the action to perform when a new datagram is received. This + * class must not close the multicast socket. + * + * @param multicastSocket socket + * @param packet the datagram packet + */ + public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet); + + public void start() throws IOException { + + if (isRunning()) { + return; + } + + multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration); + multicastSocket.joinGroup(multicastAddress.getAddress()); + + executorService = Executors.newFixedThreadPool(numThreads); + + final ExecutorService runnableExecServiceRef = executorService; + final MulticastSocket runnableMulticastSocketRef = multicastSocket; + + new Thread(new Runnable() { + @Override + public void run() { + while (runnableExecServiceRef.isShutdown() == false) { + try { + final byte[] buf = new byte[maxPacketSizeBytes]; + final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes); + runnableMulticastSocketRef.receive(packet); + runnableExecServiceRef.execute(new Runnable() { + @Override + public void run() { + dispatchRequest(multicastSocket, packet); + } + }); + } catch (final SocketException | SocketTimeoutException ste) { + /* ignore so that we can accept connections in approximately a non-blocking fashion */ + } catch (final Exception e) { + logger.warn("Cluster protocol receiver encountered exception: " + e, e); + } + } + } + }).start(); + } + + public boolean isRunning() { + return (executorService != null && executorService.isShutdown() == false); + } + + public void stop() throws IOException { + + if (isRunning() == false) { + return; + } + + // shutdown executor service + try { + if (getShutdownListenerSeconds() <= 0) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + if (executorService.isTerminated()) { + logger.info("Multicast Listener has been terminated successfully."); + } else { + logger.warn("Multicast Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); + } + } + + // shutdown server socket + if (multicastSocket.isClosed() == false) { + multicastSocket.leaveGroup(multicastAddress.getAddress()); + multicastSocket.close(); + } + + } + + public int getShutdownListenerSeconds() { + return shutdownListenerSeconds; + } + + public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { + this.shutdownListenerSeconds = shutdownListenerSeconds; + } + + public int getMaxPacketSizeBytes() { + return maxPacketSizeBytes; + } + + public void setMaxPacketSizeBytes(int maxPacketSizeBytes) { + if (maxPacketSizeBytes <= 0) { + throw new IllegalArgumentException("Max packet size must be greater than zero bytes."); + } + this.maxPacketSizeBytes = maxPacketSizeBytes; + } + + public MulticastConfiguration getConfiguration() { + return configuration; + } + + public InetSocketAddress getMulticastAddress() { + return multicastAddress; + } + + public int getNumThreads() { + return numThreads; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java new file mode 100644 index 0000000..212e20c --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * Defines the interface for discovering services based on name. Services are + * expected to be exposed via socket address and port. + * + */ +public interface MulticastServiceDiscovery extends ServiceDiscovery { + + /** + * @return the multicast address + */ + InetSocketAddress getMulticastAddress(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java new file mode 100644 index 0000000..7ef5a5d --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * Defines the interface for broadcasting a service via multicast. + * + */ +public interface MulticastServicesBroadcaster extends ServicesBroadcaster { + + /** + * @return the multicast address + */ + InetSocketAddress getMulticastAddress(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java new file mode 100644 index 0000000..3e96c61 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + */ +public enum MulticastTimeToLive { + + SAME_HOST(0), + SAME_SUBNET(1), + SAME_SITE(32), + SAME_REGION(64), + SAME_CONTINENT(128), + UNRESTRICTED(255); + + private final int ttl; + + MulticastTimeToLive(final int ttl) { + this.ttl = ttl; + } + + public int getTtl() { + return ttl; + } + + public MulticastTimeToLive valueOfByTtl(final int ttl) { + for (final MulticastTimeToLive value : values()) { + if (value.getTtl() == ttl) { + return value; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java new file mode 100644 index 0000000..84d76d2 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.MulticastSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public final class MulticastUtils { + + private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class)); + + public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException { + return createMulticastSocket(0, config); + } + + public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException { + if (config == null) { + throw new IllegalArgumentException("Configuration may not be null."); + } + + final MulticastSocket socket; + if (port <= 0) { + socket = new MulticastSocket(); + } else { + socket = new MulticastSocket(port); + } + socket.setTimeToLive(config.getTtl().getTtl()); + + if (config.getSocketTimeout() != null) { + socket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + socket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + socket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + if (config.getSendBufferSize() != null) { + socket.setSendBufferSize(config.getSendBufferSize()); + } + + if (config.getTrafficClass() != null) { + socket.setTrafficClass(config.getTrafficClass()); + } + + if (config.getLoopbackMode() != null) { + socket.setLoopbackMode(config.getLoopbackMode()); + } + + return socket; + } + + public static void closeQuietly(final MulticastSocket socket) { + + if (socket == null) { + return; + } + + try { + socket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close multicast socket due to: " + ex, ex); + } + + } + + public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) { + + if (socket == null) { + return; + } + + try { + socket.leaveGroup(groupAddress); + } catch (final Exception ex) { + logger.debug("Failed to leave multicast group due to: " + ex, ex); + } + + try { + socket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close multicast socket due to: " + ex, ex); + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java new file mode 100644 index 0000000..74c1a30 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + * Defines a generic interface for discovering services. + * + */ +public interface ServiceDiscovery { + + /** + * @return the discovered service + */ + DiscoverableService getService(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java new file mode 100644 index 0000000..2a9f9b2 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.util.Set; + +/** + * Defines the interface for broadcasting a collection of services for client + * discovery. + * + */ +public interface ServicesBroadcaster { + + /** + * @return the delay in milliseconds to wait between successive broadcasts + */ + int getBroadcastDelayMs(); + + /** + * @return the broadcasted services + */ + Set<DiscoverableService> getServices(); + + /** + * Adds the given service to the set of broadcasted services. + * + * @param service a service + * @return true if the service was added to the set; false a service with + * the given service name already exists in the set. + */ + boolean addService(DiscoverableService service); + + /** + * Removes the service with the given service name from the set. + * + * @param serviceName a service name + * @return true if the service was removed; false otherwise + */ + boolean removeService(String serviceName); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java new file mode 100644 index 0000000..a266ade --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.util.Calendar; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.io.nio.ChannelListener; +import org.apache.nifi.io.nio.consumer.StreamConsumer; +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public final class ServerMain { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class); + + public static void main(final String[] args) throws IOException { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new ConcurrentHashMap<>(); + final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0); + ChannelListener listener = null; + try { + executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS); + listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false); + listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS); + listener.addDatagramChannel(null, 20000, 32 << 20); + LOGGER.info("Listening for UDP data on port 20000"); + listener.addServerSocket(null, 20001, 64 << 20); + LOGGER.info("listening for TCP connections on port 20001"); + listener.addServerSocket(null, 20002, 64 << 20); + LOGGER.info("listening for TCP connections on port 20002"); + final Calendar endTime = Calendar.getInstance(); + endTime.add(Calendar.MINUTE, 30); + while (true) { + processAllConsumers(consumerMap); + if (endTime.before(Calendar.getInstance())) { + break; // time to shut down + } + } + } finally { + if (listener != null) { + LOGGER.info("Shutting down server...."); + listener.shutdown(1L, TimeUnit.SECONDS); + LOGGER.info("Consumer map size = " + consumerMap.size()); + while (consumerMap.size() > 0) { + processAllConsumers(consumerMap); + } + LOGGER.info("Consumer map size = " + consumerMap.size()); + } + executor.shutdown(); + } + } + + private static void processAllConsumers(final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) { + final Set<StreamConsumer> deadConsumers = new HashSet<>(); + for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : consumerMap.entrySet()) { + if (entry.getKey().isConsumerFinished()) { + entry.getValue().cancel(true); + deadConsumers.add(entry.getKey()); + } + } + for (final StreamConsumer consumer : deadConsumers) { + LOGGER.debug("removing consumer " + consumer); + consumerMap.remove(consumer); + } + } + + public static final class ConsumerRunner implements Runnable { + + private final StreamConsumer consumer; + + public ConsumerRunner(final StreamConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void run() { + if (consumer.isConsumerFinished()) { + return; + } + try { + consumer.process(); + } catch (IOException ex) { + LOGGER.error("", ex); + } + } + } + + public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory { + + final ScheduledExecutorService executor; + final Map<StreamConsumer, ScheduledFuture<?>> consumerMap; + + public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) { + this.executor = executor; + this.consumerMap = consumerMap; + } + + @Override + public StreamConsumer newInstance(final String streamId) { + final StreamConsumer consumer = new UselessStreamConsumer(streamId); + final ScheduledFuture<?> future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS); + consumerMap.put(consumer, future); + LOGGER.info("Added consumer: " + consumer); + return consumer; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java new file mode 100644 index 0000000..1c4b70c --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class TCPClient { + + private static final Logger logger = LoggerFactory.getLogger(TCPClient.class); + + public static void main(final String[] args) throws Exception { + final byte[] bytes = TCPClient.makeBytes(); + Thread first = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < 10; i++) { + sendData(20001, bytes); + } + } catch (Exception e) { + logger.error("Blew exception", e); + } + } + }); + Thread second = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < 10; i++) { + sendData(20002, bytes); + } + } catch (Exception e) { + logger.error("Blew exception", e); + } + } + }); + first.start(); + second.start(); + } + + public static byte[] makeBytes() { + byte[] bytes = new byte[2 << 20]; + return bytes; + } + + private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException { + long totalBytes; + try (Socket sock = new Socket("localhost", port)) { + sock.setTcpNoDelay(true); + sock.setSoTimeout(2000); + totalBytes = 0L; + logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything..."); + Thread.sleep(5000L); + for (int i = 0; i < 1000; i++) { + sock.getOutputStream().write(bytes); + totalBytes += bytes.length; + } + sock.getOutputStream().flush(); + } + logger.info("Total bytes sent: " + totalBytes + " to port " + port); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java new file mode 100644 index 0000000..00a00a1 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UDPClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class); + + public static void main(final String[] args) throws Exception { + final byte[] buffer = UDPClient.makeBytes(); + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000)); + final DatagramSocket socket = new DatagramSocket(); + final long startTime = System.nanoTime(); + for (int i = 0; i < 819200; i++) { // 100 MB + socket.send(packet); + } + final long endTime = System.nanoTime(); + final long durationMillis = (endTime - startTime) / 1000000; + LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); + } + + public static byte[] makeBytes() { + byte[] bytes = new byte[128]; + return bytes; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java new file mode 100644 index 0000000..107c087 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer; + +/** + * + */ +public class UselessStreamConsumer extends AbstractStreamConsumer { + + public UselessStreamConsumer(final String id) { + super(id); + } + + @Override + protected void processBuffer(final ByteBuffer buffer) throws IOException { + } + + @Override + protected void onConsumerDone() { + System.err.println("IN consumer done"); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml b/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml new file mode 100644 index 0000000..8e93769 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <!-- Appender for printing formatted log statements to the console. --> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - %m%n"/> + </layout> + </appender> + + <!-- Logger for managing logging statements for nifi --> + <logger name="nifi"> + <level value="debug"/> + </logger> + + <root> + <level value="warn"/> + <appender-ref ref="console"/> + </root> +</log4j:configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/.gitignore b/nifi-commons/nifi-utils/.gitignore new file mode 100755 index 0000000..12c5231 --- /dev/null +++ b/nifi-commons/nifi-utils/.gitignore @@ -0,0 +1,8 @@ +/target +/target +/target +/target +/target +/target +/target +/target http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/pom.xml b/nifi-commons/nifi-utils/pom.xml new file mode 100644 index 0000000..6927fc0 --- /dev/null +++ b/nifi-commons/nifi-utils/pom.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-utils</artifactId> + <version>0.3.0-SNAPSHOT</version> + <packaging>jar</packaging> + <!-- + This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library + and should keep its surface/tension minimal. + --> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java new file mode 100644 index 0000000..9b4c3af --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile.attributes; + +public enum CoreAttributes implements FlowFileAttributeKey { + + /** + * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename + */ + PATH("path"), + /** + * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename + */ + ABSOLUTE_PATH("absolute.path"), + /** + * The filename of the FlowFile. The filename should not contain any directory structure. + */ + FILENAME("filename"), + /** + * A unique UUID assigned to this FlowFile + */ + UUID("uuid"), + /** + * A numeric value indicating the FlowFile priority + */ + PRIORITY("priority"), + /** + * The MIME Type of this FlowFile + */ + MIME_TYPE("mime.type"), + /** + * Specifies the reason that a FlowFile is being discarded + */ + DISCARD_REASON("discard.reason"), + /** + * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. + */ + ALTERNATE_IDENTIFIER("alternate.identifier"); + + private final String key; + + private CoreAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java new file mode 100644 index 0000000..9637631 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile.attributes; + +public interface FlowFileAttributeKey { + + String key(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java new file mode 100644 index 0000000..77c34c9 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class StandardVersionNegotiator implements VersionNegotiator { + + private final List<Integer> versions; + private int curVersion; + + public StandardVersionNegotiator(final int... supportedVersions) { + if (Objects.requireNonNull(supportedVersions).length == 0) { + throw new IllegalArgumentException("At least one version must be supported"); + } + + final List<Integer> supported = new ArrayList<>(); + for (final int version : supportedVersions) { + supported.add(version); + } + this.versions = Collections.unmodifiableList(supported); + this.curVersion = supportedVersions[0]; + } + + @Override + public int getVersion() { + return curVersion; + } + + @Override + public void setVersion(final int version) throws IllegalArgumentException { + if (!isVersionSupported(version)) { + throw new IllegalArgumentException("Version " + version + " is not supported"); + } + + this.curVersion = version; + } + + @Override + public int getPreferredVersion() { + return versions.get(0); + } + + @Override + public Integer getPreferredVersion(final int maxVersion) { + for (final Integer version : this.versions) { + if (maxVersion >= version) { + return version; + } + } + return null; + } + + @Override + public boolean isVersionSupported(final int version) { + return versions.contains(version); + } + + @Override + public List<Integer> getSupportedVersions() { + return versions; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java new file mode 100644 index 0000000..d8ee27a --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.List; + +public interface VersionNegotiator { + + /** + * @return the currently configured Version of this resource + */ + int getVersion(); + + /** + * Sets the version of this resource to the specified version. Only the lower byte of the version is relevant. + * + * @param version the version to set + * @throws IllegalArgumentException if the given Version is not supported by this resource, as is indicated by the {@link #isVersionSupported(int)} method + */ + void setVersion(int version) throws IllegalArgumentException; + + /** + * + * @return the Version of this resource that is preferred + */ + int getPreferredVersion(); + + /** + * Gets the preferred version of this resource that is no greater than the given maxVersion. If no acceptable version exists that is less than <code>maxVersion</code>, then <code>null</code> is + * returned + * + * @param maxVersion the maximum version desired + * @return the preferred version if found; null otherwise + */ + Integer getPreferredVersion(int maxVersion); + + /** + * Indicates whether or not the specified version is supported by this resource + * + * @param version the version to test + * @return true if supported; false otherwise + */ + boolean isVersionSupported(int version); + + List<Integer> getSupportedVersions(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java new file mode 100644 index 0000000..d18c807 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +/** + * Indicates that the user disabled transmission while communications were taking place with a peer + */ +public class TransmissionDisabledException extends RuntimeException { + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java new file mode 100644 index 0000000..6434b2d --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +public class CompressionInputStream extends InputStream { + + private final InputStream in; + private final Inflater inflater; + + private byte[] compressedBuffer; + private byte[] buffer; + + private int bufferIndex; + private boolean eos = false; // whether or not we've reached the end of stream + private boolean allDataRead = false; // different from eos b/c eos means allDataRead == true && buffer is empty + + private final byte[] fourByteBuffer = new byte[4]; + + public CompressionInputStream(final InputStream in) { + this.in = in; + inflater = new Inflater(); + + buffer = new byte[0]; + compressedBuffer = new byte[0]; + bufferIndex = 1; + } + + private String toHex(final byte[] array) { + final StringBuilder sb = new StringBuilder("0x"); + for (final byte b : array) { + final String hex = Integer.toHexString(b).toUpperCase(); + if (hex.length() == 1) { + sb.append("0"); + } + sb.append(hex); + } + return sb.toString(); + } + + protected void readChunkHeader() throws IOException { + // Ensure that we have a valid SYNC chunk + fillBuffer(fourByteBuffer); + if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) { + throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer)); + } + + // determine the size of the decompressed buffer + fillBuffer(fourByteBuffer); + buffer = new byte[toInt(fourByteBuffer)]; + + // determine the size of the compressed buffer + fillBuffer(fourByteBuffer); + compressedBuffer = new byte[toInt(fourByteBuffer)]; + + bufferIndex = buffer.length; // indicate that buffer is empty + } + + private int toInt(final byte[] data) { + return ((data[0] & 0xFF) << 24) + | ((data[1] & 0xFF) << 16) + | ((data[2] & 0xFF) << 8) + | (data[3] & 0xFF); + } + + protected void bufferAndDecompress() throws IOException { + if (allDataRead) { + eos = true; + return; + } + + readChunkHeader(); + fillBuffer(compressedBuffer); + + inflater.setInput(compressedBuffer); + try { + inflater.inflate(buffer); + } catch (final DataFormatException e) { + throw new IOException(e); + } + inflater.reset(); + + bufferIndex = 0; + final int moreDataByte = in.read(); + if (moreDataByte < 1) { + allDataRead = true; + } else if (moreDataByte > 1) { + throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte); + } + } + + private void fillBuffer(final byte[] buffer) throws IOException { + int len; + int bytesLeft = buffer.length; + int bytesRead = 0; + while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) { + bytesLeft -= len; + bytesRead += len; + } + + if (bytesRead < buffer.length) { + throw new EOFException(); + } + } + + private boolean isBufferEmpty() { + return bufferIndex >= buffer.length; + } + + @Override + public int read() throws IOException { + if (eos) { + return -1; + } + + if (isBufferEmpty()) { + bufferAndDecompress(); + } + + if (isBufferEmpty()) { + eos = true; + return -1; + } + + return buffer[bufferIndex++] & 0xFF; + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (eos) { + return -1; + } + + if (isBufferEmpty()) { + bufferAndDecompress(); + } + + if (isBufferEmpty()) { + eos = true; + return -1; + } + + final int free = buffer.length - bufferIndex; + final int bytesToTransfer = Math.min(len, free); + System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer); + bufferIndex += bytesToTransfer; + + return bytesToTransfer; + } + + /** + * Does nothing. Does NOT close underlying InputStream + * + * @throws java.io.IOException for any issues closing underlying stream + */ + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java new file mode 100644 index 0000000..525b5b1 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Deflater; + +public class CompressionOutputStream extends OutputStream { + + public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'}; + + public static final int DEFAULT_COMPRESSION_LEVEL = 1; + public static final int DEFAULT_BUFFER_SIZE = 64 << 10; + public static final int MIN_BUFFER_SIZE = 8 << 10; + + private final OutputStream out; + private final Deflater deflater; + + private final byte[] buffer; + private final byte[] compressed; + + private int bufferIndex = 0; + private boolean dataWritten = false; + + public CompressionOutputStream(final OutputStream outStream) { + this(outStream, DEFAULT_BUFFER_SIZE); + } + + public CompressionOutputStream(final OutputStream outStream, final int bufferSize) { + this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY); + } + + public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) { + if (bufferSize < MIN_BUFFER_SIZE) { + throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE); + } + + this.out = outStream; + this.deflater = new Deflater(level); + this.deflater.setStrategy(strategy); + buffer = new byte[bufferSize]; + compressed = new byte[bufferSize + 64]; + } + + /** + * Compresses the currently buffered chunk of data and sends it to the output stream + * + * @throws IOException if issues occur writing to stream + */ + protected void compressAndWrite() throws IOException { + if (bufferIndex <= 0) { + return; + } + + deflater.setInput(buffer, 0, bufferIndex); + deflater.finish(); + final int compressedBytes = deflater.deflate(compressed); + + writeChunkHeader(compressedBytes); + out.write(compressed, 0, compressedBytes); + + bufferIndex = 0; + deflater.reset(); + } + + private void writeChunkHeader(final int compressedBytes) throws IOException { + // If we have already written data, write out a '1' to indicate that we have more data; when we close + // the stream, we instead write a '0' to indicate that we are finished sending data. + if (dataWritten) { + out.write(1); + } + out.write(SYNC_BYTES); + dataWritten = true; + + writeInt(out, bufferIndex); + writeInt(out, compressedBytes); + } + + private void writeInt(final OutputStream out, final int val) throws IOException { + out.write(val >>> 24); + out.write(val >>> 16); + out.write(val >>> 8); + out.write(val); + } + + protected boolean bufferFull() { + return bufferIndex >= buffer.length; + } + + @Override + public void write(final int b) throws IOException { + buffer[bufferIndex++] = (byte) (b & 0xFF); + if (bufferFull()) { + compressAndWrite(); + } + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + int bytesLeft = len; + while (bytesLeft > 0) { + final int free = buffer.length - bufferIndex; + final int bytesThisIteration = Math.min(bytesLeft, free); + System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration); + bufferIndex += bytesThisIteration; + + bytesLeft -= bytesThisIteration; + if (bufferFull()) { + compressAndWrite(); + } + } + } + + @Override + public void flush() throws IOException { + compressAndWrite(); + super.flush(); + } + + @Override + public void close() throws IOException { + compressAndWrite(); + out.write(0); // indicate that the stream is finished. + out.flush(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java new file mode 100644 index 0000000..e03dfbf --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +public class InterruptableInputStream extends InputStream { + + private volatile boolean interrupted = false; + private final InputStream in; + + public InterruptableInputStream(final InputStream in) { + this.in = in; + } + + @Override + public int read() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(b, off, len); + } + + @Override + public int available() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.available(); + } + + @Override + public void close() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.markSupported(); + } + + @Override + public synchronized void reset() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.reset(); + } + + @Override + public long skip(long n) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.skip(n); + } + + public void interrupt() { + interrupted = true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java new file mode 100644 index 0000000..cba5be6 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +public class InterruptableOutputStream extends OutputStream { + + private final OutputStream out; + private volatile boolean interrupted = false; + + public InterruptableOutputStream(final OutputStream out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b, off, len); + } + + @Override + public void close() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.close(); + } + + @Override + public void flush() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.flush(); + } + + public void interrupt() { + this.interrupted = true; + } +}