Here's the proposed patch. Changed lines formatted with Netbeans 8.2 default formatter. Copyright header not added to the new file.
> Hi. > In case of wrong setup a `cvs log` command may print nothing. This makes the > LoggedDataInputStream#read() method in the Netbeans CVS client loop forever. > See this SO > question for details: > https://stackoverflow.com/questions/48741854/inputstream-wrapper-with-actual-read-in-a-dedicated-thread > > There's a bug mentioning sleep() inside LoggedDataInputStream: > https://netbeans.org/bugzilla/show_bug.cgi?id=254761 > > There are two types of input streams the LoggedDataInputStream works with. If > it's a server connection, the stream is a SocketInputStream. If it's a local > connection, the > stream is a read side of a pipe returned by Process.getInputStream(). > > I suppose that we want to keep the Thread.interrupted() check in > LoggedDataInputStream, but using available() is definitely not the right way > to check for premature EOF. > I also suppose that we want to keep the java 6 level. > > I want to but remove the sleep() and available() calls from > LoggedDataInputStream and instead catch the SocketTimeoutException and check > the interrupted status on a timeout. > > In case of a socket we will set the socket timeout to 100ms. > In case of a pipe we will perform reading in a dedicated thread. See the SO > question above. >
>From 5c656b53953da4de424a1ae1c0b2ec12c327177b Mon Sep 17 00:00:00 2001 From: Ilya Basin <[email protected]> Date: Wed, 14 Feb 2018 16:48:46 +0300 Subject: [PATCH 1/2] 254761: unneeded fiddling with thread interrupted status --- src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java index 2108b7d..1ac2033 100644 --- a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java +++ b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java @@ -89,8 +89,7 @@ public ByteArray readLineBytes() throws IOException { boolean throwEOF = true; ByteArray byteArray = new ByteArray(); loop: while (true) { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); + if (Thread.currentThread().isInterrupted()) { break; } if (in.available() == 0) { @@ -131,8 +130,7 @@ public ByteArray readLineBytes() throws IOException { int ch; ByteArray byteArray = new ByteArray(); loop: while (len != 0) { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); + if (Thread.currentThread().isInterrupted()) { break; } if (in.available() == 0) { -- 2.16.1
>From a3f472508ee3c63a7c5336a60945af32498fc394 Mon Sep 17 00:00:00 2001 From: Ilya Basin <[email protected]> Date: Thu, 15 Feb 2018 13:35:21 +0300 Subject: [PATCH 2/2] 254761: CVS don't sleep, don't hang in LoggedDataInputStream --- .../lib/cvsclient/connection/ExtConnection.java | 2 +- .../cvsclient/connection/PServerConnection.java | 4 +- .../cvsclient/util/InterruptibleInputStream.java | 370 +++++++++++++++++++++ .../lib/cvsclient/util/LoggedDataInputStream.java | 136 ++++++-- 4 files changed, 477 insertions(+), 35 deletions(-) create mode 100644 src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java diff --git a/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java b/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java index 5d6f52c..56ef813 100644 --- a/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java +++ b/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java @@ -76,7 +76,7 @@ public ExtConnection(String command) { public void open() throws AuthenticationException, CommandAbortedException { try { process = Runtime.getRuntime().exec(command); - setInputStream(new LoggedDataInputStream(new BufferedInputStream(process.getInputStream()))); + setInputStream(new LoggedDataInputStream(process.getInputStream())); setOutputStream(new LoggedDataOutputStream(new BufferedOutputStream(process.getOutputStream()))); } catch (IOException e) { throw new AuthenticationException(e, "Failed to execute: " + command); diff --git a/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java b/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java index 8fb9c4b..004e235 100644 --- a/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java +++ b/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java @@ -226,9 +226,7 @@ private void openConnection(String preamble, String postamble) LoggedDataOutputStream outputStream = new LoggedDataOutputStream(bos); setOutputStream(outputStream); - BufferedInputStream bis = - new BufferedInputStream(socket.getInputStream(), 32768); - LoggedDataInputStream inputStream = new LoggedDataInputStream(bis); + LoggedDataInputStream inputStream = new LoggedDataInputStream(socket, 32768); setInputStream(inputStream); outputStream.writeBytes(preamble, "US-ASCII"); diff --git a/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java b/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java new file mode 100644 index 0000000..ab0ae19 --- /dev/null +++ b/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java @@ -0,0 +1,370 @@ +package org.netbeans.lib.cvsclient.util; + +import static org.netbeans.lib.cvsclient.util.InterruptibleInputStream.StreamOp.*; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +/** + * InputStream wrapper that mimics the behavior of a socket input stream. The + * read methods have a timeout. The {@link #close()} call returns instantly and + * cannot throw. + * <p> + * The actual read is performed in a dedicated thread. The thread lives until + * {@link #close()} is called and the current read operation completes. + * <p> + * The main purpose of the class is to wrap pipes. The thread MAY hang during + * read, if nothing is written to the other side of the pipe. Then again the + * thread is merely a dependency of the resource on the other side of the pipe. + * If that resource leaks, the thread will stay. + * <p> + * Data is not lost because of read timeouts and the last operation can be + * repeated. A side effect is that a single {@link #skip(long)} call cannot skip + * more bytes than we can save in the buffer. + * <p> + * Methods may throw the usual SocketInputStream exceptions plus + * {@link InterruptedException} in case of interrupted wait. In that case, the + * stream is not closed and the interrupted status of the thread is set. + */ +public class InterruptibleInputStream extends FilterInputStream { + + public InterruptibleInputStream(InputStream in) { + super(in); + } + + public void setOpTimeout(long millis) { + this.timeoutNanos = millis * MILLION; + } + + /** + * If false, wait operations will throw {@link InterruptedIOException}. If + * true, to behave more like a socket, wait operations will retry after + * {@link InterruptedException}. In the end, the interrupted status of the + * thread will be restored. Default is true. + */ + public void setIgnoreInterrupts(boolean b) { + this.ignoreInterrupts = b; + } + + private long timeoutNanos; + + private boolean ignoreInterrupts = true; + + enum StreamOp { + READ, AVAIL + } + + private final Object lock = new Object(); + + private Thread waiter; + + private Thread worker; + + private boolean opCompleted; + + private StreamOp currentOp; + + private boolean closed; + + private byte[] currentBuf = new byte[1024]; + + private int currentOfs, currentLen; + + private int resInt; + + private Throwable resEx; + + // final value assigned to currentOp on return from requestOp() + private StreamOp finalOpValue; + + @Override + public int read() throws IOException { + byte[] buf = new byte[1]; + return read(buf, 0, 1) == -1 ? -1 : buf[0]; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + return checkAndRequestOp(READ, b, off, len); + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + int skipN = (int) Math.min(currentBuf.length, n); + int res = checkAndRequestOp(READ, null, 0, skipN); + return res == -1 ? 0 : res; + } + + @Override + public int available() throws IOException { + return checkAndRequestOp(AVAIL, null, 0, 0); + } + + @Override + public void close() throws IOException { + synchronized (lock) { + if (!closed) { + closed = true; + lock.notifyAll(); + } + } + } + + @Override + public void mark(int readlimit) { + // + } + + @Override + public void reset() throws IOException { + // + } + + @Override + public boolean markSupported() { + return false; + } + + private int checkAndRequestOp(StreamOp op, byte[] buf, int off, int len) + throws IOException { + long deadline = System.nanoTime() + + (timeoutNanos > 0 ? timeoutNanos : Long.MAX_VALUE); + synchronized (lock) { + checkClosed(); + ensureThread(); + + boolean[] interrupted = new boolean[1]; + try { + while (waiter != null && waiter.isAlive()) { + waitOrThrow(deadline, interrupted); + } + + waiter = Thread.currentThread(); + + try { + return requestOp(op, buf, off, len, deadline, interrupted); + } finally { + currentOp = finalOpValue; + waiter = null; + lock.notifyAll(); + } + } finally { + if (interrupted[0]) { + Thread.currentThread().interrupt(); + } + } + } + } + + private void checkClosed() throws IOException { + if (closed) { + throw new SocketException("Stream closed"); + } + } + + private int requestOp(StreamOp op, byte[] buf, int off, int len, + long deadline, boolean[] interrupted) throws IOException { + finalOpValue = currentOp; + if (currentOp != null) { + if (op == AVAIL && currentOp == READ && !opCompleted) { + // available() should not block for long unless another thread + // is waiting for a read op + return 0; + } + + // currentOp will not change during the following wait() + while (!opCompleted) { + waitOrThrow(deadline, interrupted); + } + + // default is to reset finalOpValue unless there's + // unread data in buffer which can be read + finalOpValue = null; + + if (currentOp == READ) { + switch (op) { + case AVAIL: + if (resEx != null || resInt <= 0) { + // try again: maybe EOF no more + break; + } + // currentOp should stay READ + finalOpValue = currentOp; + return resInt; + case READ: + if (resEx != null || resInt <= 0) { + // try again: maybe EOF no more + break; + } + int newLen = Math.min(resInt, len); + if (buf != null) { + System.arraycopy(currentBuf, currentOfs, buf, off, + newLen); + } + currentOfs += newLen; + resInt -= newLen; + + if (resInt != 0) { + // something's left in buffer + // currentOp should stay READ + finalOpValue = currentOp; + } + return newLen; + } + } + } + + resEx = null; + + // buf is null when called from skip() + if (op == READ && currentBuf.length < len) { + currentBuf = new byte[len]; + } + currentOfs = 0; + currentLen = len; + + finalOpValue = op; + currentOp = op; + opCompleted = false; + lock.notifyAll(); + + do { + waitOrThrow(deadline, interrupted); + } while (!opCompleted); + + finalOpValue = null; + + if (resEx != null) { + if (resEx instanceof IOException) { + // IOException subclasses handled differently by caller. + // Cannot wrap with a generic IOException. + if (resEx.getCause() == null) { + // Can preserve original stacktrace in the cause. + Exception causeStackTrace = new Exception(resEx.getClass() + .getName() + ": " + resEx.getMessage()); + causeStackTrace.setStackTrace(resEx.getStackTrace()); + resEx.initCause(causeStackTrace); + } + // Re-thrown exception must have current thread's stacktrace. + resEx.fillInStackTrace(); + throw (IOException) resEx; + } + throw new RuntimeException(resEx); + } + if (buf != null && resInt != -1) { + System.arraycopy(currentBuf, 0, buf, off, resInt); + } + return resInt; + } + + private void ensureThread() { + if (worker != null) { + return; + } + worker = new Thread() { + + @Override + public void run() { + try { + runOps(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + worker.setName("reader-" + worker.getId()); + worker.setDaemon(true); + worker.start(); + } + + private void runOps() throws InterruptedException { + boolean alreadyThrowing = true; + try { + while (waitOp()) { + runOp(); + } + alreadyThrowing = false; + } finally { + synchronized (lock) { + closed = true; + lock.notifyAll(); + } + try { + in.close(); + } catch (Exception e) { + if (!alreadyThrowing) { + throw new RuntimeException(e); + } + } + } + } + + private boolean waitOp() throws InterruptedException { + synchronized (lock) { + while (!closed) { + if (currentOp != null && !opCompleted) { + return true; + } + lock.wait(); + } + } + return false; + } + + private void runOp() { + try { + switch (currentOp) { + case READ: + resInt = in.read(currentBuf, 0, currentLen); + break; + case AVAIL: + resInt = in.available(); + break; + } + } catch (Error e) { + resEx = e; + throw e; + } catch (Throwable e) { + resEx = e; + } finally { + synchronized (lock) { + opCompleted = true; + lock.notifyAll(); + } + } + } + + private void waitOrThrow(long deadline, boolean[] interrupted) + throws IOException { + long delay = deadline - System.nanoTime(); + if (delay <= 0) { + throw new SocketTimeoutException("timeout"); + } + try { + lock.wait(delay / MILLION, (int) (delay % MILLION)); + } catch (InterruptedException e) { + interrupted[0] = true; + if (!ignoreInterrupts) { + throw new InterruptedIOException("interrupted"); + } + } + checkClosed(); + } + + private static int MILLION = 1000000; +} diff --git a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java index 1ac2033..737414d 100644 --- a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java +++ b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java @@ -49,6 +49,9 @@ package org.netbeans.lib.cvsclient.util; import java.io.*; +import java.lang.reflect.Field; +import java.net.Socket; +import java.net.SocketTimeoutException; /** * This input stream worked exactly like the normal DataInputStream except that @@ -57,6 +60,8 @@ */ public class LoggedDataInputStream extends FilterInputStream { + public static final int DEFAULT_SO_TIMEOUT = 100; + private long counter; /** @@ -64,7 +69,28 @@ * @param in the stream */ public LoggedDataInputStream(InputStream in) { - super(in); + super(wrapIn(in)); + } + + public LoggedDataInputStream(InputStream in, int bufSz) { + super(wrapIn(in, bufSz)); + } + + public LoggedDataInputStream(Socket sock, int bufSz) throws IOException { + super(new BufferedInputStream(sock.getInputStream(), bufSz)); + sock.setSoTimeout(DEFAULT_SO_TIMEOUT); + } + + private static InputStream wrapIn(InputStream in) { + int[] bufSz = new int[1]; + InputStream unwr = unwrapBIS(in, bufSz); + return wrapIn(unwr, bufSz[0]); + } + + private static InputStream wrapIn(InputStream in, int bufSz) { + InterruptibleInputStream res = new InterruptibleInputStream(in); + res.setOpTimeout(DEFAULT_SO_TIMEOUT); + return new BufferedInputStream(res, bufSz); } /** @@ -92,16 +118,11 @@ public ByteArray readLineBytes() throws IOException { if (Thread.currentThread().isInterrupted()) { break; } - if (in.available() == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException iex) { - Thread.currentThread().interrupt(); - break loop; - } + try { + ch = in.read(); + } catch (SocketTimeoutException e) { continue; } - ch = in.read(); counter++; switch (ch) { case -1: @@ -133,16 +154,11 @@ public ByteArray readLineBytes() throws IOException { if (Thread.currentThread().isInterrupted()) { break; } - if (in.available() == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException iex) { - Thread.currentThread().interrupt(); - break loop; - } + try { + ch = in.read(); + } catch (SocketTimeoutException e) { continue; } - ch = in.read(); counter++; switch (ch) { case -1: @@ -170,12 +186,7 @@ public void close() throws IOException { * array of bytes. */ public int read(byte[] b) throws IOException { - int read = in.read(b); - if (read != -1) { - Logger.logInput(b, 0, read); - counter += read; - } - return read; + return read(b, 0, b.length); } /** @@ -183,7 +194,7 @@ public int read(byte[] b) throws IOException { * bytes */ public int read(byte[] b, int off, int len) throws IOException { - int read = in.read(b, off, len); + int read = read1(b, off, len); if (read != -1) { Logger.logInput(b, off, read); counter += read; @@ -191,8 +202,18 @@ public int read(byte[] b, int off, int len) throws IOException { return read; } + private int read1(byte[] b, int off, int len) throws IOException { + for (;;) { + try { + return in.read(b, off, len); + } catch (SocketTimeoutException e) { + // no timeout as in original LoggedDataInputStream + } + } + } + public long skip(long n) throws IOException { - long skip = super.skip(n); + long skip = skip1(n); if (skip > 0) { Logger.logInput(new String("<skipped " + skip + " bytes>").getBytes("utf8")); // NOI18N counter += skip; @@ -200,21 +221,33 @@ public long skip(long n) throws IOException { return skip; } + private long skip1(long n) throws IOException { + for (;;) { + try { + return super.skip(n); + } catch (SocketTimeoutException e) { + // no timeout as in original LoggedDataInputStream + } + } + } + /** * Interruptible read. * @throws InterruptedIOException on thread interrupt */ public int read() throws IOException { - while (in.available() == 0) { + int i; + for (;;) { try { - Thread.sleep(100); - } catch (InterruptedException iex) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(); + i = super.read(); + break; + } catch (SocketTimeoutException e) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedIOException(); + } } } - int i = super.read(); if (i != -1) { Logger.logInput((char) i); counter++; @@ -233,4 +266,45 @@ public void setUnderlyingStream(InputStream is) { public long getCounter() { return counter; } + + /** + * Get the underlying stream from the {@link BufferedInputStream} returned + * by {@link Process#getInputStream()} to allow buffering in some other + * place. + */ + public static InputStream unwrapBIS(InputStream in, int[] bufSz) { + bufSz[0] = 8192; + // do not unwrap BufferedInputStream subclasses + if (in != null && BufferedInputStream.class.equals(in.getClass()) + && FLD_IN != null) { + try { + byte[] buf = ((byte[]) FLD_BUF.get(in)); + if (buf != null) { + bufSz[0] = buf.length; + } + return (InputStream) FLD_IN.get(in); + } catch (Exception e) { + // can't happen. Fallthrough + } + } + return in; + } + + private static final Field FLD_IN; + private static final Field FLD_BUF; + + static { + FLD_IN = getField(FilterInputStream.class, "in"); + FLD_BUF = getField(BufferedInputStream.class, "buf"); + } + + private static Field getField(Class clazz, String name) { + try { + Field f = FilterInputStream.class.getDeclaredField(name); + f.setAccessible(true); + return f; + } catch (Throwable e) { + return null; + } + } } -- 2.16.1
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information about the NetBeans mailing lists, visit: https://cwiki.apache.org/confluence/display/NETBEANS/Mailing+lists
