http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java new file mode 100644 index 0000000..c52b4b7 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.util.Set; + +import org.apache.nifi.remote.PeerStatus; + +public class PeerStatusCache { + + private final Set<PeerStatus> statuses; + private final long timestamp; + + public PeerStatusCache(final Set<PeerStatus> statuses) { + this(statuses, System.currentTimeMillis()); + } + + public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) { + this.statuses = statuses; + this.timestamp = timestamp; + } + + public Set<PeerStatus> getStatuses() { + return statuses; + } + + public long getTimestamp() { + return timestamp; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java new file mode 100644 index 0000000..70bb324 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.MinimumLengthInputStream; + +public class StandardDataPacket implements DataPacket { + + private final Map<String, String> attributes; + private final InputStream stream; + private final long size; + + public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) { + this.attributes = attributes; + this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size); + this.size = size; + } + + public Map<String, String> getAttributes() { + return attributes; + } + + public InputStream getData() { + return stream; + } + + public long getSize() { + return size; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java new file mode 100644 index 0000000..8336745 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.junit.Test; + +public class TestEndpointConnectionStatePool { + + @Test + public void testFormulateDestinationListForOutput() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE); + for (final PeerStatus peerStatus : destinations) { + System.out.println(peerStatus.getPeerDescription()); + } + } + + @Test + public void testFormulateDestinationListForOutputHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE); + for (final PeerStatus peerStatus : destinations) { + System.out.println(peerStatus.getPeerDescription()); + } + } + + @Test + public void testFormulateDestinationListForInputPorts() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for (final PeerStatus peerStatus : destinations) { + System.out.println(peerStatus.getPeerDescription()); + } + } + + @Test + public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for (final PeerStatus peerStatus : destinations) { + System.out.println(peerStatus.getPeerDescription()); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java new file mode 100644 index 0000000..155fc95 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.StreamUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class TestSiteToSiteClient { + + @Test + @Ignore("For local testing only; not really a unit test but a manual test") + public void testReceive() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + + final SiteToSiteClient client = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("cba") + .requestBatchCount(10) + .build(); + + try { + for (int i = 0; i < 1000; i++) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + Assert.assertNotNull(transaction); + + DataPacket packet; + while (true) { + packet = transaction.receive(); + if (packet == null) { + break; + } + + final InputStream in = packet.getData(); + final long size = packet.getSize(); + final byte[] buff = new byte[(int) size]; + + StreamUtils.fillBuffer(in, buff); + } + + transaction.confirm(); + transaction.complete(); + } + } finally { + client.close(); + } + } + + @Test + @Ignore("For local testing only; not really a unit test but a manual test") + public void testSend() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + + final SiteToSiteClient client = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("input") + .build(); + + try { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + Assert.assertNotNull(transaction); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("site-to-site", "yes, please!"); + final byte[] bytes = "Hello".getBytes(); + final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); + transaction.send(packet); + + transaction.confirm(); + transaction.complete(); + } finally { + client.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/pom.xml b/nifi-commons/nifi-socket-utils/pom.xml new file mode 100644 index 0000000..4ed05ef --- /dev/null +++ b/nifi-commons/nifi-socket-utils/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-socket-utils</artifactId> + <description>Utilities for socket communication</description> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-logging-utils</artifactId> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java new file mode 100644 index 0000000..cc24575 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.io.nio.consumer.StreamConsumer; +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractChannelReader implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class); + private final String uniqueId; + private final SelectionKey key; + private final BufferPool bufferPool; + private final StreamConsumer consumer; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);//the future on which this reader runs... + + public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { + this.uniqueId = id; + this.key = key; + this.bufferPool = empties; + this.consumer = consumerFactory.newInstance(id); + consumer.setReturnBufferQueue(bufferPool); + } + + protected void setScheduledFuture(final ScheduledFuture<?> future) { + this.future.set(future); + } + + protected ScheduledFuture<?> getScheduledFuture() { + return future.get(); + } + + protected SelectionKey getSelectionKey() { + return key; + } + + public boolean isClosed() { + return isClosed.get(); + } + + private void closeStream() { + if (isClosed.get()) { + return; + } + try { + isClosed.set(true); + future.get().cancel(false); + key.cancel(); + key.channel().close(); + } catch (final IOException ioe) { + LOGGER.warn("Unable to cleanly close stream due to " + ioe); + } finally { + consumer.signalEndOfStream(); + } + } + + /** + * Allows a subclass to specifically handle how it reads from the given + * key's channel into the given buffer. + * + * @param key of channel to read from + * @param buffer to fill + * @return the number of bytes read in the final read cycle. A value of zero + * or more indicates the channel is still open but a value of -1 indicates + * end of stream. + * @throws IOException if reading from channel causes failure + */ + protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException; + + @Override + public final void run() { + if (!key.isValid() || consumer.isConsumerFinished()) { + closeStream(); + return; + } + if (!key.isReadable()) { + return;//there is nothing available to read...or we aren't allow to read due to throttling + } + ByteBuffer buffer = null; + try { + buffer = bufferPool.poll(); + if (buffer == null) { + return; // no buffers available - come back later + } + final int bytesRead = fillBuffer(key, buffer); + buffer.flip(); + if (buffer.remaining() > 0) { + consumer.addFilledBuffer(buffer); + buffer = null; //clear the reference - is now the consumer's responsiblity + } else { + buffer.clear(); + bufferPool.returnBuffer(buffer, 0); + buffer = null; //clear the reference - is now back to the queue + } + if (bytesRead < 0) { //we've reached the end + closeStream(); + } + } catch (final Exception ioe) { + closeStream(); + LOGGER.error("Closed channel reader " + this + " due to " + ioe); + } finally { + if (buffer != null) { + buffer.clear(); + bufferPool.returnBuffer(buffer, 0); + } + } + } + + @Override + public final boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + AbstractChannelReader rhs = (AbstractChannelReader) obj; + return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); + } + + @Override + public final int hashCode() { + return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode(); + } + + @Override + public final String toString() { + return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java new file mode 100644 index 0000000..007034b --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BufferPool implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class); + final BlockingQueue<ByteBuffer> bufferPool; + private final static double ONE_MB = 1 << 20; + private Calendar lastRateSampleTime = Calendar.getInstance(); + private final Calendar startTime = Calendar.getInstance(); + double lastRateSampleMBps = -1.0; + double overallMBps = -1.0; + private long totalBytesExtracted = 0L; + private long lastTotalBytesExtracted = 0L; + final double maxRateMBps; + + public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) { + bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect)); + this.maxRateMBps = maxRateMBps; + } + + /** + * Returns the given buffer to the pool - and clears it. + * + * @param buffer buffer to return + * @param bytesProcessed bytes processed for this buffer being returned + * @return true if buffer returned to pool + */ + public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) { + totalBytesExtracted += bytesProcessed; + buffer.clear(); + return bufferPool.add(buffer); + } + + //here we enforce the desired rate we want by restricting access to buffers when we're over rate + public synchronized ByteBuffer poll() { + computeRate(); + final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3); + if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) { + return null; + } + return bufferPool.poll(); + } + + public int size() { + return bufferPool.size(); + } + + private synchronized void computeRate() { + final Calendar now = Calendar.getInstance(); + final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis(); + final double duractionSecs = ((double) measurementDurationMillis) / 1000.0; + if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast + final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis(); + final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0; + final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted; + lastTotalBytesExtracted = totalBytesExtracted; + lastRateSampleTime = now; + final double bps = ((double) differenceBytes) / duractionSecs; + final double totalBps = ((double) totalBytesExtracted / totalDurationSecs); + lastRateSampleMBps = bps / ONE_MB; + overallMBps = totalBps / ONE_MB; + } + } + + public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) { + final List<ByteBuffer> buffers = new ArrayList<>(); + for (int i = 0; i < bufferCount; i++) { + final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity); + buffers.add(buffer); + } + return buffers; + } + + private void logChannelReadRates() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted)); + } + } + + @Override + public void run() { + computeRate(); + logChannelReadRates(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java new file mode 100644 index 0000000..a4308e3 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.io.IOException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ChannelDispatcher implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class); + private final Selector serverSocketSelector; + private final Selector socketChannelSelector; + private final ScheduledExecutorService executor; + private final BufferPool emptyBuffers; + private final StreamConsumerFactory factory; + private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS); + private final long timeout; + private final boolean readSingleDatagram; + private volatile boolean stop = false; + public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L; + + public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service, + final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) { + this.serverSocketSelector = serverSocketSelector; + this.socketChannelSelector = socketChannelSelector; + this.executor = service; + this.factory = factory; + emptyBuffers = buffers; + this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + this.readSingleDatagram = readSingleDatagram; + } + + public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) { + channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit)); + } + + @Override + public void run() { + while (!stop) { + try { + selectServerSocketKeys(); + selectSocketChannelKeys(); + } catch (final Exception ex) { + LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex}); + } + } + } + + /* + * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all + * channels' keys. + * + * @throws IOException if unable to select keys + */ + private void selectServerSocketKeys() throws IOException { + int numSelected = serverSocketSelector.select(timeout); + if (numSelected == 0) { + return; + } + + // for each registered server socket - see if any connections are waiting to be established + final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator(); + while (itr.hasNext()) { + SelectionKey serverSocketkey = itr.next(); + final SelectableChannel channel = serverSocketkey.channel(); + AbstractChannelReader reader = null; + if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) { + final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel(); + final SocketChannel sChannel = ssChannel.accept(); + if (sChannel != null) { + sChannel.configureBlocking(false); + final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ); + final String readerId = sChannel.socket().toString(); + reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory); + final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, + channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS); + reader.setScheduledFuture(readerFuture); + socketChannelKey.attach(reader); + } + } + itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0. + if (reader != null && LOGGER.isDebugEnabled()) { + LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader); + } + } + } + + /* + * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors + * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that + * the selected key set is not manually changed via a remove operation. + * + * @throws IOException if unable to select keys + */ + private void selectSocketChannelKeys() throws IOException { + // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return. + // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys. + int numSelected = socketChannelSelector.select(timeout); + if (numSelected == 0) { + return; + } + + for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) { + final SelectableChannel channel = socketChannelKey.channel(); + AbstractChannelReader reader = null; + // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own + // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However, + // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only + // way to tell if it's new is the lack of an attachment. + if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) { + reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram); + socketChannelKey.attach(reader); + final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(), + TimeUnit.MILLISECONDS); + reader.setScheduledFuture(readerFuture); + } + if (reader != null && LOGGER.isDebugEnabled()) { + LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader); + } + } + + } + + public void stop() { + stop = true; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java new file mode 100644 index 0000000..ab77063 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides the entry point to NIO based socket listeners for NiFi + * processors and services. There are 2 supported types of Listeners, Datagram + * (UDP based transmissions) and ServerSocket (TCP based transmissions). This + * will create the ChannelDispatcher, which is a Runnable and is controlled via + * the ScheduledExecutorService, which is also created by this class. The + * ChannelDispatcher handles connections to the ServerSocketChannels and creates + * the readers associated with the resulting SocketChannels. Additionally, this + * creates and manages two Selectors, one for ServerSocketChannels and another + * for SocketChannels and DatagramChannels. + * + * The threading model for this consists of one thread for the + * ChannelDispatcher, one thread per added SocketChannel reader, one thread per + * added DatagramChannel reader. The ChannelDispatcher is not scheduled with + * fixed delay as the others are. It is throttled by the provided timeout value. + * Within the ChannelDispatcher there are two blocking operations which will + * block for the given timeout each time through the enclosing loop. + * + * All channels are cached in one of the two Selectors via their SelectionKey. + * The serverSocketSelector maintains all the added ServerSocketChannels; the + * socketChannelSelector maintains the all the add DatagramChannels and the + * created SocketChannels. Further, the SelectionKey of the DatagramChannel and + * the SocketChannel is injected with the channel's associated reader. + * + * All ChannelReaders will get throttled by the unavailability of buffers in the + * provided BufferPool. This is designed to create back pressure. + * + */ +public final class ChannelListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class); + private final ScheduledExecutorService executor; + private final Selector serverSocketSelector; // used to listen for new connections + private final Selector socketChannelSelector; // used to listen on existing connections + private final ChannelDispatcher channelDispatcher; + private final BufferPool bufferPool; + private final int initialBufferPoolSize; + private volatile long channelReaderFrequencyMSecs = 50; + + public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout, + TimeUnit unit, final boolean readSingleDatagram) throws IOException { + this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread + this.serverSocketSelector = Selector.open(); + this.socketChannelSelector = Selector.open(); + this.bufferPool = bufferPool; + this.initialBufferPoolSize = bufferPool.size(); + channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool, + timeout, unit, readSingleDatagram); + executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS); + } + + public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) { + channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit); + channelDispatcher.setChannelReaderFrequency(period, unit); + } + + /** + * Adds a server socket channel for listening to connections. + * + * @param nicIPAddress - if null binds to wildcard address + * @param port - port to bind to + * @param receiveBufferSize - size of OS receive buffer to request. If less + * than 0 then will not be set and OS default will win. + * @throws IOException if unable to add socket + */ + public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) + throws IOException { + final ServerSocketChannel ssChannel = ServerSocketChannel.open(); + ssChannel.configureBlocking(false); + if (receiveBufferSize > 0) { + ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < receiveBufferSize) { + LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to " + + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize + + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + ssChannel.bind(new InetSocketAddress(nicIPAddress, port)); + ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT); + } + + /** + * Binds to listen for data grams on the given local IPAddress/port + * + * @param nicIPAddress - if null will listen on wildcard address, which + * means datagrams will be received on all local network interfaces. + * Otherwise, will bind to the provided IP address associated with some NIC. + * @param port - the port to listen on + * @param receiveBufferSize - the number of bytes to request for a receive + * buffer from OS + * @throws IOException if unable to add channel + */ + public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) + throws IOException { + final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize); + dChannel.register(socketChannelSelector, SelectionKey.OP_READ); + } + + /** + * Binds to listen for data grams on the given local IPAddress/port and + * restricts receipt of datagrams to those from the provided host and port, + * must specify both. This improves performance for datagrams coming from a + * sender that is known a-priori. + * + * @param nicIPAddress - if null will listen on wildcard address, which + * means datagrams will be received on all local network interfaces. + * Otherwise, will bind to the provided IP address associated with some NIC. + * @param port - the port to listen on. This is used to provide a well-known + * destination for a sender. + * @param receiveBufferSize - the number of bytes to request for a receive + * buffer from OS + * @param sendingHost - the hostname, or IP address, of the sender of + * datagrams. Only datagrams from this host will be received. If this is + * null the wildcard ip is used, which means datagrams may be received from + * any network interface on the local host. + * @param sendingPort - the port used by the sender of datagrams. Only + * datagrams from this port will be received. + * @throws IOException if unable to add channel + */ + public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost, + final Integer sendingPort) throws IOException { + + if (sendingHost == null || sendingPort == null) { + addDatagramChannel(nicIPAddress, port, receiveBufferSize); + return; + } + final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize); + dChannel.connect(new InetSocketAddress(sendingHost, sendingPort)); + dChannel.register(socketChannelSelector, SelectionKey.OP_READ); + } + + private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) + throws IOException { + final DatagramChannel dChannel = DatagramChannel.open(); + dChannel.configureBlocking(false); + if (receiveBufferSize > 0) { + dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < receiveBufferSize) { + LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to " + + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize + + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + dChannel.bind(new InetSocketAddress(nicIPAddress, port)); + return dChannel; + } + + public void shutdown(final long period, final TimeUnit timeUnit) { + channelDispatcher.stop(); + for (SelectionKey selectionKey : socketChannelSelector.keys()) { + final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment(); + selectionKey.cancel(); + if (reader != null) { + while (!reader.isClosed()) { + try { + Thread.sleep(channelReaderFrequencyMSecs); + } catch (InterruptedException e) { + } + } + final ScheduledFuture<?> readerFuture = reader.getScheduledFuture(); + readerFuture.cancel(false); + } + IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist... + } + IOUtils.closeQuietly(socketChannelSelector); + + for (SelectionKey selectionKey : serverSocketSelector.keys()) { + selectionKey.cancel(); + IOUtils.closeQuietly(selectionKey.channel()); + } + IOUtils.closeQuietly(serverSocketSelector); + executor.shutdown(); + try { + executor.awaitTermination(period, timeUnit); + } catch (final InterruptedException ex) { + LOGGER.warn("Interrupted while trying to shutdown executor"); + } + final int currentBufferPoolSize = bufferPool.size(); + final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize + + " Current buffer count=" + currentBufferPoolSize + + " Could indicate a buffer leak. Ensure all consumers are executed until they complete." : ""; + LOGGER.info("Channel listener shutdown. " + warning); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java new file mode 100644 index 0000000..a4670b9 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +public final class DatagramChannelReader extends AbstractChannelReader { + + public static final int MAX_UDP_PACKET_SIZE = 65507; + + private final boolean readSingleDatagram; + + public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory, + final boolean readSingleDatagram) { + super(id, key, empties, consumerFactory); + this.readSingleDatagram = readSingleDatagram; + } + + /** + * Will receive UDP data from channel and won't receive anything unless the + * given buffer has enough space for at least one full max udp packet. + * + * @param key selection key + * @param buffer to fill + * @return bytes read + * @throws IOException if error filling buffer from channel + */ + @Override + protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException { + final DatagramChannel dChannel = (DatagramChannel) key.channel(); + final int initialBufferPosition = buffer.position(); + while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) { + if (dChannel.receive(buffer) == null || readSingleDatagram) { + break; + } + } + return buffer.position() - initialBufferPosition; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java new file mode 100644 index 0000000..29c2973 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +public final class SocketChannelReader extends AbstractChannelReader { + + public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { + super(id, key, empties, consumerFactory); + } + + /** + * Receives TCP data from the socket channel for the given key. + * + * @param key selection key + * @param buffer byte buffer to fill + * @return bytes read + * @throws IOException if error reading bytes + */ + @Override + protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException { + int bytesRead = 0; + final SocketChannel sChannel = (SocketChannel) key.channel(); + while (key.isValid() && key.isReadable()) { + bytesRead = sChannel.read(buffer); + if (bytesRead <= 0) { + break; + } + } + return bytesRead; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java new file mode 100644 index 0000000..bb57e26 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.consumer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.nifi.io.nio.BufferPool; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +/** + * + */ +public abstract class AbstractStreamConsumer implements StreamConsumer { + + private final String uniqueId; + private BufferPool bufferPool = null; + private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>(); + private final AtomicBoolean streamEnded = new AtomicBoolean(false); + private final AtomicBoolean consumerEnded = new AtomicBoolean(false); + + public AbstractStreamConsumer(final String id) { + uniqueId = id; + } + + @Override + public final void setReturnBufferQueue(final BufferPool returnQueue) { + bufferPool = returnQueue; + } + + @Override + public final void addFilledBuffer(final ByteBuffer buffer) { + if (isConsumerFinished()) { + buffer.clear(); + bufferPool.returnBuffer(buffer, buffer.remaining()); + } else { + filledBuffers.add(buffer); + } + } + + @Override + public final void process() throws IOException { + if (isConsumerFinished()) { + return; + } + if (streamEnded.get() && filledBuffers.isEmpty()) { + consumerEnded.set(true); + onConsumerDone(); + return; + } + final ByteBuffer buffer = filledBuffers.poll(); + if (buffer != null) { + final int bytesToProcess = buffer.remaining(); + try { + processBuffer(buffer); + } finally { + buffer.clear(); + bufferPool.returnBuffer(buffer, bytesToProcess); + } + } + } + + protected abstract void processBuffer(ByteBuffer buffer) throws IOException; + + @Override + public final void signalEndOfStream() { + streamEnded.set(true); + } + + /** + * Convenience method that is called when the consumer is done processing + * based on being told the signal is end of stream and has processed all + * given buffers. + */ + protected void onConsumerDone() { + } + + @Override + public final boolean isConsumerFinished() { + return consumerEnded.get(); + } + + @Override + public final String getId() { + return uniqueId; + } + + @Override + public final boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj; + return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); + } + + @Override + public final int hashCode() { + return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode(); + } + + @Override + public final String toString() { + return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java new file mode 100644 index 0000000..cac8d8b --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.consumer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.nifi.io.nio.BufferPool; + +/** + * A StreamConsumer must be thread safe. It may be accessed concurrently by a + * thread providing data to process and another thread that is processing that + * data. + * + */ +public interface StreamConsumer { + + /** + * Will be called once just after construction. It provides the queue to + * which processed and emptied and cleared buffers must be returned. For + * each time <code>addFilledBuffer</code> is called there should be an + * associated add to this given queue. If not, buffers will run out and all + * stream processing will halt. READ THIS!!! + * + * @param returnQueue pool of buffers to use + */ + void setReturnBufferQueue(BufferPool returnQueue); + + /** + * Will be called by the thread that produces byte buffers with available + * data to be processed. If the consumer is finished this should simply + * return the given buffer to the return buffer queue (after it is cleared) + * + * @param buffer filled buffer + */ + void addFilledBuffer(ByteBuffer buffer); + + /** + * Will be called by the thread that executes the consumption of data. May + * be called many times though once <code>isConsumerFinished</code> returns + * true this method will likely do nothing. + * + * @throws java.io.IOException if there is an issue processing + */ + void process() throws IOException; + + /** + * Called once the end of the input stream is detected + */ + void signalEndOfStream(); + + /** + * If true signals the consumer is done consuming data and will not process + * any more buffers. + * + * @return true if finished + */ + boolean isConsumerFinished(); + + /** + * Uniquely identifies the consumer + * + * @return identifier of consumer + */ + String getId(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java new file mode 100644 index 0000000..ba858b6 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.consumer; + +/** + * + */ +public interface StreamConsumerFactory { + + StreamConsumer newInstance(String streamId); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java new file mode 100644 index 0000000..9c6cb82 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.nifi.util.NiFiProperties; + +public class SSLContextFactory { + + private final String keystore; + private final char[] keystorePass; + private final String keystoreType; + private final String truststore; + private final char[] truststorePass; + private final String truststoreType; + + private final KeyManager[] keyManagers; + private final TrustManager[] trustManagers; + + public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException { + keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE); + keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD)); + keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE); + + truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE); + truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD)); + truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE); + + // prepare the keystore + final KeyStore keyStore = KeyStore.getInstance(keystoreType); + keyStore.load(new FileInputStream(keystore), keystorePass); + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, keystorePass); + + // prepare the truststore + final KeyStore trustStore = KeyStore.getInstance(truststoreType); + trustStore.load(new FileInputStream(truststore), truststorePass); + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + keyManagers = keyManagerFactory.getKeyManagers(); + trustManagers = trustManagerFactory.getTrustManagers(); + } + + private static char[] getPass(final String password) { + return password == null ? null : password.toCharArray(); + } + + /** + * Creates a SSLContext instance using the given information. + * + * + * @return a SSLContext instance + * @throws java.security.KeyStoreException if problem with keystore + * @throws java.io.IOException if unable to create context + * @throws java.security.NoSuchAlgorithmException if algorithm isn't known + * @throws java.security.cert.CertificateException if certificate is invalid + * @throws java.security.UnrecoverableKeyException if the key cannot be recovered + * @throws java.security.KeyManagementException if the key is improper + */ + public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException, KeyManagementException { + + // initialize the ssl context + final SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(keyManagers, trustManagers, new SecureRandom()); + sslContext.getDefaultSSLParameters().setNeedClientAuth(true); + + return sslContext; + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java new file mode 100644 index 0000000..d6aca92 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; + +public final class ServerSocketConfiguration { + + private boolean needClientAuth; + private Integer socketTimeout; + private Boolean reuseAddress; + private Integer receiveBufferSize; + private SSLContextFactory sslContextFactory; + + public ServerSocketConfiguration() { + } + + public SSLContext createSSLContext() + throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + return sslContextFactory == null ? null : sslContextFactory.createSslContext(); + } + + public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { + this.sslContextFactory = sslContextFactory; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public boolean getNeedClientAuth() { + return needClientAuth; + } + + public void setNeedClientAuth(boolean needClientAuth) { + this.needClientAuth = needClientAuth; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java new file mode 100644 index 0000000..8b803dc --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; + +public final class SocketConfiguration { + + private Integer socketTimeout; + private Integer receiveBufferSize; + private Integer sendBufferSize; + private Boolean reuseAddress; + private Boolean keepAlive; + private Boolean oobInline; + private Boolean tcpNoDelay; + private Integer trafficClass; + private SSLContextFactory sslContextFactory; + + public SSLContext createSSLContext() + throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + return sslContextFactory == null ? null : sslContextFactory.createSslContext(); + } + + public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { + this.sslContextFactory = sslContextFactory; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Boolean getKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(Boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public Boolean getOobInline() { + return oobInline; + } + + public void setOobInline(Boolean oobInline) { + this.oobInline = oobInline; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public Integer getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(Integer sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public Boolean getTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(Boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public Integer getTrafficClass() { + return trafficClass; + } + + public void setTrafficClass(Integer trafficClass) { + this.trafficClass = trafficClass; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java new file mode 100644 index 0000000..b509035 --- /dev/null +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.logging.NiFiLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for TCP/IP messages sent over unicast socket. + * + */ +public abstract class SocketListener { + + private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketListener.class)); + private volatile ExecutorService executorService; // volatile to guarantee most current value is visible + private volatile ServerSocket serverSocket; // volatile to guarantee most current value is visible + private final int numThreads; + private final int port; + private final ServerSocketConfiguration configuration; + private final AtomicInteger shutdownListenerSeconds = new AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS); + + public SocketListener( + final int numThreads, + final int port, + final ServerSocketConfiguration configuration) { + + if (numThreads <= 0) { + throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); + } else if (configuration == null) { + throw new IllegalArgumentException("Server socket configuration may not be null."); + } + + this.numThreads = numThreads; + this.port = port; + this.configuration = configuration; + } + + /** + * Implements the action to perform when a new socket request is received. + * This class will close the socket. + * + * @param socket the socket + */ + public abstract void dispatchRequest(final Socket socket); + + public void start() throws IOException { + + if (isRunning()) { + return; + } + + try { + serverSocket = SocketUtils.createServerSocket(port, configuration); + } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) { + throw new IOException(e); + } + + final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); + executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { + private final AtomicLong threadCounter = new AtomicLong(0L); + + @Override + public Thread newThread(final Runnable r) { + final Thread newThread = defaultThreadFactory.newThread(r); + newThread.setName("Process NCM Request-" + threadCounter.incrementAndGet()); + return newThread; + } + }); + + final ExecutorService runnableExecServiceRef = executorService; + final ServerSocket runnableServerSocketRef = serverSocket; + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + while (runnableExecServiceRef.isShutdown() == false) { + Socket socket = null; + try { + try { + socket = runnableServerSocketRef.accept(); + if (configuration.getSocketTimeout() != null) { + socket.setSoTimeout(configuration.getSocketTimeout()); + } + } catch (final SocketTimeoutException ste) { + // nobody connected to us. Go ahead and call closeQuietly just to make sure we don't leave + // any sockets lingering + SocketUtils.closeQuietly(socket); + continue; + } catch (final SocketException se) { + logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se); + SocketUtils.closeQuietly(socket); + continue; + } catch (final Throwable t) { + logger.warn("Socket Listener encountered exception: " + t, t); + SocketUtils.closeQuietly(socket); + continue; + } + + final Socket finalSocket = socket; + runnableExecServiceRef.execute(new Runnable() { + @Override + public void run() { + try { + dispatchRequest(finalSocket); + } catch (final Throwable t) { + logger.warn("Dispatching socket request encountered exception due to: " + t, t); + } finally { + SocketUtils.closeQuietly(finalSocket); + } + } + }); + } catch (final Throwable t) { + logger.error("Socket Listener encountered exception: " + t, t); + SocketUtils.closeQuietly(socket); + } + } + } + }); + t.setName("Cluster Socket Listener"); + t.start(); + } + + public boolean isRunning() { + return (executorService != null && executorService.isShutdown() == false); + } + + public void stop() throws IOException { + + if (isRunning() == false) { + return; + } + + // shutdown executor service + try { + if (getShutdownListenerSeconds() <= 0) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + if (executorService.isTerminated()) { + logger.info("Socket Listener has been terminated successfully."); + } else { + logger.warn("Socket Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); + } + } + + // shutdown server socket + SocketUtils.closeQuietly(serverSocket); + + } + + public int getShutdownListenerSeconds() { + return shutdownListenerSeconds.get(); + } + + public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { + this.shutdownListenerSeconds.set(shutdownListenerSeconds); + } + + public ServerSocketConfiguration getConfiguration() { + return configuration; + } + + public int getPort() { + if (isRunning()) { + return serverSocket.getLocalPort(); + } else { + return port; + } + } + +}