This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 9ba2806 NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not
to occur with site-to-site. Fixed bug that caused site-to-site listener not to
accept connections if there are no input/output ports on the root group - this
used to be a valid check and was done to prevent spawning extra threads and
doing extra work if no ports exist but now that we have site-to-site ports
outside of the root group it's no longer a reasonable condition to check.
9ba2806 is described below
commit 9ba280680ff8b40b15d460e6f822c2b10d4373c3
Author: Mark Payne <[email protected]>
AuthorDate: Mon Aug 19 15:29:51 2019 -0400
NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not to occur with
site-to-site. Fixed bug that caused site-to-site listener not to accept
connections if there are no input/output ports on the root group - this used to
be a valid check and was done to prevent spawning extra threads and doing extra
work if no ports exist but now that we have site-to-site ports outside of the
root group it's no longer a reasonable condition to check.
This closes #3658.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../nifi/remote/io/socket/SocketChannelInput.java | 8 +-
.../remote/io/socket/SocketChannelInputStream.java | 94 ++++++++++++----------
.../nifi/remote/SocketRemoteSiteListener.java | 20 +----
3 files changed, 62 insertions(+), 60 deletions(-)
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 85ae504..5cf2a62 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -16,13 +16,14 @@
*/
package org.apache.nifi.remote.io.socket;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.SocketChannel;
-import org.apache.nifi.remote.io.InterruptableInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
public class SocketChannelInput implements CommunicationsInput {
@@ -62,6 +63,7 @@ public class SocketChannelInput implements
CommunicationsInput {
public void interrupt() {
interruptableIn.interrupt();
+ socketIn.interrupt();
}
@Override
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index c0cfa11..21f1683 100644
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -16,19 +16,24 @@
*/
package org.apache.nifi.remote.io.socket;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
public class SocketChannelInputStream extends InputStream {
- private static final long CHANNEL_EMPTY_WAIT_NANOS =
TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
private final SocketChannel channel;
private volatile int timeoutMillis = 30000;
+ private volatile boolean interrupted = false;
+ private final Selector readSelector;
private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
private Byte bufferedByte = null;
@@ -37,6 +42,9 @@ public class SocketChannelInputStream extends InputStream {
// this class expects a non-blocking channel
socketChannel.configureBlocking(false);
this.channel = socketChannel;
+
+ readSelector = Selector.open();
+ this.channel.register(readSelector, SelectionKey.OP_READ);
}
public void setTimeout(final int timeoutMillis) {
@@ -68,32 +76,24 @@ public class SocketChannelInputStream extends InputStream {
final long maxTime = System.currentTimeMillis() + timeoutMillis;
- final boolean blocking = channel.isBlocking();
-
- try {
- channel.configureBlocking(true);
+ waitForReady();
- int bytesRead;
- do {
- bytesRead = channel.read(oneByteBuffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading
from socket");
- }
+ int bytesRead;
+ do {
+ bytesRead = channel.read(oneByteBuffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from
socket");
}
- } while (bytesRead == 0);
-
- if (bytesRead == -1) {
- return -1;
}
+ } while (bytesRead == 0);
- oneByteBuffer.flip();
- return oneByteBuffer.get() & 0xFF;
- } finally {
- if (!blocking) {
- channel.configureBlocking(false);
- }
+ if (bytesRead == -1) {
+ return -1;
}
+
+ oneByteBuffer.flip();
+ return oneByteBuffer.get() & 0xFF;
}
@Override
@@ -110,29 +110,35 @@ public class SocketChannelInputStream extends InputStream
{
return 1;
}
- final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+ waitForReady();
- final boolean blocking = channel.isBlocking();
- try {
- channel.configureBlocking(true);
-
- final long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesRead;
- do {
- bytesRead = channel.read(buffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading
from socket");
- }
+ final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+ final long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesRead;
+ do {
+ bytesRead = channel.read(buffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from
socket");
}
- } while (bytesRead == 0);
+ }
+ } while (bytesRead == 0);
- return bytesRead;
- } finally {
- if (!blocking) {
- channel.configureBlocking(false);
+ return bytesRead;
+ }
+
+ private void waitForReady() throws IOException {
+ int readyCount = readSelector.select(timeoutMillis);
+ if (readyCount < 1) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
}
+
+ throw new SocketTimeoutException("Timed out reading from socket");
}
+
+ final Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
+ selectedKeys.clear(); // clear the selected keys so that the Selector
will be able to add them back to the ready set next time they are ready.
}
@Override
@@ -164,6 +170,11 @@ public class SocketChannelInputStream extends InputStream {
return false;
}
+ public void interrupt() {
+ interrupted = true;
+ readSelector.wakeup();
+ }
+
/**
* Closes the underlying socket channel.
*
@@ -172,5 +183,6 @@ public class SocketChannelInputStream extends InputStream {
@Override
public void close() throws IOException {
channel.close();
+ readSelector.close();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index f864ab7..6fa86ff 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -88,7 +88,7 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
@Override
public void start() throws IOException {
final boolean secure = (sslContext != null);
- final List<Thread> threads = new ArrayList<Thread>();
+ final List<Thread> threads = new ArrayList<>();
final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
@@ -101,17 +101,6 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
@Override
public void run() {
while (!stopped.get()) {
- final ProcessGroup processGroup = rootGroup.get();
- // If nodeInformant is not null, we are in clustered mode,
which means that we don't care about
- // the processGroup.
- if ((nodeInformant == null) && (processGroup == null ||
(processGroup.getInputPorts().isEmpty() &&
processGroup.getOutputPorts().isEmpty()))) {
- try {
- Thread.sleep(2000L);
- } catch (final Exception e) {
- }
- continue;
- }
-
LOG.trace("Accepting Connection...");
Socket acceptedSocket = null;
try {
@@ -241,8 +230,7 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
commsSession.setTimeout((int)
protocol.getRequestExpiration());
- LOG.info("Successfully negotiated
ServerProtocol {} Version {} with {}", new Object[]{
- protocol.getResourceName(),
protocol.getVersionNegotiator().getVersion(), peer});
+ LOG.info("Successfully negotiated
ServerProtocol {} Version {} with {}", protocol.getResourceName(),
protocol.getVersionNegotiator().getVersion(), peer);
try {
while (!protocol.isShutdown()) {
@@ -258,7 +246,7 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
// Give the timeout a bit
longer (twice as long) to receive the Request Type,
// in order to attempt to
receive more data without shutting down the socket if we don't
// have to.
- LOG.debug("{} Timed out
waiting to receive RequestType using {} with {}", new Object[]{this, protocol,
peer});
+ LOG.debug("{} Timed out
waiting to receive RequestType using {} with {}", this, protocol, peer);
timeoutCount++;
requestType = null;
@@ -377,7 +365,7 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
public void destroy() {
}
- private void verifyMagicBytes(final InputStream in, final String
peerDescription) throws IOException, HandshakeException {
+ private void verifyMagicBytes(final InputStream in, final String
peerDescription) throws IOException {
final byte[] receivedMagicBytes = new
byte[CommunicationsSession.MAGIC_BYTES.length];
// expect magic bytes