Here's the proposed patch.
Changed lines formatted with Netbeans 8.2 default formatter. Copyright header 
not added to the new file.

> Hi.
> In case of wrong setup a `cvs log` command may print nothing. This makes the 
> LoggedDataInputStream#read() method in the Netbeans CVS client loop forever. 
> See this SO
> question for details:
> https://stackoverflow.com/questions/48741854/inputstream-wrapper-with-actual-read-in-a-dedicated-thread
>
> There's a bug mentioning sleep() inside LoggedDataInputStream: 
> https://netbeans.org/bugzilla/show_bug.cgi?id=254761
>
> There are two types of input streams the LoggedDataInputStream works with. If 
> it's a server connection, the stream is a SocketInputStream. If it's a local 
> connection, the
> stream is a read side of a pipe returned by Process.getInputStream().
>
> I suppose that we want to keep the Thread.interrupted() check in 
> LoggedDataInputStream, but using available() is definitely not the right way 
> to check for premature EOF.
> I also suppose that we want to keep the java 6 level.
>
> I want to but remove the sleep() and available() calls from 
> LoggedDataInputStream and instead catch the SocketTimeoutException and check 
> the interrupted status on a timeout.
>
> In case of a socket we will set the socket timeout to 100ms.
> In case of a pipe we will perform reading in a dedicated thread. See the SO 
> question above.
>

>From 5c656b53953da4de424a1ae1c0b2ec12c327177b Mon Sep 17 00:00:00 2001
From: Ilya Basin <[email protected]>
Date: Wed, 14 Feb 2018 16:48:46 +0300
Subject: [PATCH 1/2] 254761: unneeded fiddling with thread interrupted status

---
 src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java 
b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
index 2108b7d..1ac2033 100644
--- a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
+++ b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
@@ -89,8 +89,7 @@ public ByteArray readLineBytes() throws IOException {
         boolean throwEOF = true;
         ByteArray byteArray = new ByteArray();
         loop: while (true) {
-            if (Thread.interrupted()) {
-                Thread.currentThread().interrupt();
+            if (Thread.currentThread().isInterrupted()) {
                 break;
             }
             if (in.available() == 0) {
@@ -131,8 +130,7 @@ public ByteArray readLineBytes() throws IOException {
         int ch;
         ByteArray byteArray = new ByteArray();
         loop: while (len != 0) {
-            if (Thread.interrupted()) {
-                Thread.currentThread().interrupt();
+            if (Thread.currentThread().isInterrupted()) {
                 break;
             }
             if (in.available() == 0) {
-- 
2.16.1


>From a3f472508ee3c63a7c5336a60945af32498fc394 Mon Sep 17 00:00:00 2001
From: Ilya Basin <[email protected]>
Date: Thu, 15 Feb 2018 13:35:21 +0300
Subject: [PATCH 2/2] 254761: CVS don't sleep, don't hang in
 LoggedDataInputStream

---
 .../lib/cvsclient/connection/ExtConnection.java    |   2 +-
 .../cvsclient/connection/PServerConnection.java    |   4 +-
 .../cvsclient/util/InterruptibleInputStream.java   | 370 +++++++++++++++++++++
 .../lib/cvsclient/util/LoggedDataInputStream.java  | 136 ++++++--
 4 files changed, 477 insertions(+), 35 deletions(-)
 create mode 100644 
src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java

diff --git a/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java 
b/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java
index 5d6f52c..56ef813 100644
--- a/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java
+++ b/src/org/netbeans/lib/cvsclient/connection/ExtConnection.java
@@ -76,7 +76,7 @@ public ExtConnection(String command) {
     public void open() throws AuthenticationException, CommandAbortedException 
{
         try {
             process = Runtime.getRuntime().exec(command);
-            setInputStream(new LoggedDataInputStream(new 
BufferedInputStream(process.getInputStream())));
+            setInputStream(new 
LoggedDataInputStream(process.getInputStream()));
             setOutputStream(new LoggedDataOutputStream(new 
BufferedOutputStream(process.getOutputStream())));
         } catch (IOException e) {
             throw new AuthenticationException(e, "Failed to execute: " + 
command);
diff --git a/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java 
b/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java
index 8fb9c4b..004e235 100644
--- a/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java
+++ b/src/org/netbeans/lib/cvsclient/connection/PServerConnection.java
@@ -226,9 +226,7 @@ private void openConnection(String preamble, String 
postamble)
             LoggedDataOutputStream outputStream = new 
LoggedDataOutputStream(bos);
             setOutputStream(outputStream);
 
-            BufferedInputStream bis =
-                    new BufferedInputStream(socket.getInputStream(), 32768);
-            LoggedDataInputStream inputStream = new LoggedDataInputStream(bis);
+            LoggedDataInputStream inputStream = new 
LoggedDataInputStream(socket, 32768);
             setInputStream(inputStream);
                        
             outputStream.writeBytes(preamble, "US-ASCII");
diff --git a/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java 
b/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java
new file mode 100644
index 0000000..ab0ae19
--- /dev/null
+++ b/src/org/netbeans/lib/cvsclient/util/InterruptibleInputStream.java
@@ -0,0 +1,370 @@
+package org.netbeans.lib.cvsclient.util;
+
+import static 
org.netbeans.lib.cvsclient.util.InterruptibleInputStream.StreamOp.*;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+/**
+ * InputStream wrapper that mimics the behavior of a socket input stream. The
+ * read methods have a timeout. The {@link #close()} call returns instantly and
+ * cannot throw.
+ * <p>
+ * The actual read is performed in a dedicated thread. The thread lives until
+ * {@link #close()} is called and the current read operation completes.
+ * <p>
+ * The main purpose of the class is to wrap pipes. The thread MAY hang during
+ * read, if nothing is written to the other side of the pipe. Then again the
+ * thread is merely a dependency of the resource on the other side of the pipe.
+ * If that resource leaks, the thread will stay.
+ * <p>
+ * Data is not lost because of read timeouts and the last operation can be
+ * repeated. A side effect is that a single {@link #skip(long)} call cannot 
skip
+ * more bytes than we can save in the buffer.
+ * <p>
+ * Methods may throw the usual SocketInputStream exceptions plus
+ * {@link InterruptedException} in case of interrupted wait. In that case, the
+ * stream is not closed and the interrupted status of the thread is set.
+ */
+public class InterruptibleInputStream extends FilterInputStream {
+
+    public InterruptibleInputStream(InputStream in) {
+        super(in);
+    }
+
+    public void setOpTimeout(long millis) {
+        this.timeoutNanos = millis * MILLION;
+    }
+
+    /**
+     * If false, wait operations will throw {@link InterruptedIOException}. If
+     * true, to behave more like a socket, wait operations will retry after
+     * {@link InterruptedException}. In the end, the interrupted status of the
+     * thread will be restored. Default is true.
+     */
+    public void setIgnoreInterrupts(boolean b) {
+        this.ignoreInterrupts = b;
+    }
+
+    private long timeoutNanos;
+
+    private boolean ignoreInterrupts = true;
+
+    enum StreamOp {
+        READ, AVAIL
+    }
+
+    private final Object lock = new Object();
+
+    private Thread waiter;
+
+    private Thread worker;
+
+    private boolean opCompleted;
+
+    private StreamOp currentOp;
+
+    private boolean closed;
+
+    private byte[] currentBuf = new byte[1024];
+
+    private int currentOfs, currentLen;
+
+    private int resInt;
+
+    private Throwable resEx;
+
+    // final value assigned to currentOp on return from requestOp()
+    private StreamOp finalOpValue;
+
+    @Override
+    public int read() throws IOException {
+        byte[] buf = new byte[1];
+        return read(buf, 0, 1) == -1 ? -1 : buf[0];
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        }
+        return checkAndRequestOp(READ, b, off, len);
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
+            return 0;
+        }
+        int skipN = (int) Math.min(currentBuf.length, n);
+        int res = checkAndRequestOp(READ, null, 0, skipN);
+        return res == -1 ? 0 : res;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return checkAndRequestOp(AVAIL, null, 0, 0);
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (lock) {
+            if (!closed) {
+                closed = true;
+                lock.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        //
+    }
+
+    @Override
+    public void reset() throws IOException {
+        //
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    private int checkAndRequestOp(StreamOp op, byte[] buf, int off, int len)
+            throws IOException {
+        long deadline = System.nanoTime()
+                + (timeoutNanos > 0 ? timeoutNanos : Long.MAX_VALUE);
+        synchronized (lock) {
+            checkClosed();
+            ensureThread();
+
+            boolean[] interrupted = new boolean[1];
+            try {
+                while (waiter != null && waiter.isAlive()) {
+                    waitOrThrow(deadline, interrupted);
+                }
+
+                waiter = Thread.currentThread();
+
+                try {
+                    return requestOp(op, buf, off, len, deadline, interrupted);
+                } finally {
+                    currentOp = finalOpValue;
+                    waiter = null;
+                    lock.notifyAll();
+                }
+            } finally {
+                if (interrupted[0]) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+    private void checkClosed() throws IOException {
+        if (closed) {
+            throw new SocketException("Stream closed");
+        }
+    }
+
+    private int requestOp(StreamOp op, byte[] buf, int off, int len,
+            long deadline, boolean[] interrupted) throws IOException {
+        finalOpValue = currentOp;
+        if (currentOp != null) {
+            if (op == AVAIL && currentOp == READ && !opCompleted) {
+                // available() should not block for long unless another thread
+                // is waiting for a read op
+                return 0;
+            }
+
+            // currentOp will not change during the following wait()
+            while (!opCompleted) {
+                waitOrThrow(deadline, interrupted);
+            }
+
+            // default is to reset finalOpValue unless there's
+            // unread data in buffer which can be read
+            finalOpValue = null;
+
+            if (currentOp == READ) {
+                switch (op) {
+                    case AVAIL:
+                        if (resEx != null || resInt <= 0) {
+                            // try again: maybe EOF no more
+                            break;
+                        }
+                        // currentOp should stay READ
+                        finalOpValue = currentOp;
+                        return resInt;
+                    case READ:
+                        if (resEx != null || resInt <= 0) {
+                            // try again: maybe EOF no more
+                            break;
+                        }
+                        int newLen = Math.min(resInt, len);
+                        if (buf != null) {
+                            System.arraycopy(currentBuf, currentOfs, buf, off,
+                                    newLen);
+                        }
+                        currentOfs += newLen;
+                        resInt -= newLen;
+
+                        if (resInt != 0) {
+                            // something's left in buffer
+                            // currentOp should stay READ
+                            finalOpValue = currentOp;
+                        }
+                        return newLen;
+                }
+            }
+        }
+
+        resEx = null;
+
+        // buf is null when called from skip()
+        if (op == READ && currentBuf.length < len) {
+            currentBuf = new byte[len];
+        }
+        currentOfs = 0;
+        currentLen = len;
+
+        finalOpValue = op;
+        currentOp = op;
+        opCompleted = false;
+        lock.notifyAll();
+
+        do {
+            waitOrThrow(deadline, interrupted);
+        } while (!opCompleted);
+
+        finalOpValue = null;
+
+        if (resEx != null) {
+            if (resEx instanceof IOException) {
+                // IOException subclasses handled differently by caller.
+                // Cannot wrap with a generic IOException.
+                if (resEx.getCause() == null) {
+                    // Can preserve original stacktrace in the cause.
+                    Exception causeStackTrace = new Exception(resEx.getClass()
+                            .getName() + ": " + resEx.getMessage());
+                    causeStackTrace.setStackTrace(resEx.getStackTrace());
+                    resEx.initCause(causeStackTrace);
+                }
+                // Re-thrown exception must have current thread's stacktrace.
+                resEx.fillInStackTrace();
+                throw (IOException) resEx;
+            }
+            throw new RuntimeException(resEx);
+        }
+        if (buf != null && resInt != -1) {
+            System.arraycopy(currentBuf, 0, buf, off, resInt);
+        }
+        return resInt;
+    }
+
+    private void ensureThread() {
+        if (worker != null) {
+            return;
+        }
+        worker = new Thread() {
+
+            @Override
+            public void run() {
+                try {
+                    runOps();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        worker.setName("reader-" + worker.getId());
+        worker.setDaemon(true);
+        worker.start();
+    }
+
+    private void runOps() throws InterruptedException {
+        boolean alreadyThrowing = true;
+        try {
+            while (waitOp()) {
+                runOp();
+            }
+            alreadyThrowing = false;
+        } finally {
+            synchronized (lock) {
+                closed = true;
+                lock.notifyAll();
+            }
+            try {
+                in.close();
+            } catch (Exception e) {
+                if (!alreadyThrowing) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private boolean waitOp() throws InterruptedException {
+        synchronized (lock) {
+            while (!closed) {
+                if (currentOp != null && !opCompleted) {
+                    return true;
+                }
+                lock.wait();
+            }
+        }
+        return false;
+    }
+
+    private void runOp() {
+        try {
+            switch (currentOp) {
+                case READ:
+                    resInt = in.read(currentBuf, 0, currentLen);
+                    break;
+                case AVAIL:
+                    resInt = in.available();
+                    break;
+            }
+        } catch (Error e) {
+            resEx = e;
+            throw e;
+        } catch (Throwable e) {
+            resEx = e;
+        } finally {
+            synchronized (lock) {
+                opCompleted = true;
+                lock.notifyAll();
+            }
+        }
+    }
+
+    private void waitOrThrow(long deadline, boolean[] interrupted)
+            throws IOException {
+        long delay = deadline - System.nanoTime();
+        if (delay <= 0) {
+            throw new SocketTimeoutException("timeout");
+        }
+        try {
+            lock.wait(delay / MILLION, (int) (delay % MILLION));
+        } catch (InterruptedException e) {
+            interrupted[0] = true;
+            if (!ignoreInterrupts) {
+                throw new InterruptedIOException("interrupted");
+            }
+        }
+        checkClosed();
+    }
+
+    private static int MILLION = 1000000;
+}
diff --git a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java 
b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
index 1ac2033..737414d 100644
--- a/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
+++ b/src/org/netbeans/lib/cvsclient/util/LoggedDataInputStream.java
@@ -49,6 +49,9 @@
 package org.netbeans.lib.cvsclient.util;
 
 import java.io.*;
+import java.lang.reflect.Field;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
 
 /**
  * This input stream worked exactly like the normal DataInputStream except that
@@ -57,6 +60,8 @@
  */
 public class LoggedDataInputStream extends FilterInputStream {
 
+    public static final int DEFAULT_SO_TIMEOUT = 100;
+
     private long counter;
 
     /**
@@ -64,7 +69,28 @@
      * @param in the stream
      */
     public LoggedDataInputStream(InputStream in) {
-        super(in);
+        super(wrapIn(in));
+    }
+
+    public LoggedDataInputStream(InputStream in, int bufSz) {
+        super(wrapIn(in, bufSz));
+    }
+
+    public LoggedDataInputStream(Socket sock, int bufSz) throws IOException {
+        super(new BufferedInputStream(sock.getInputStream(), bufSz));
+        sock.setSoTimeout(DEFAULT_SO_TIMEOUT);
+    }
+
+    private static InputStream wrapIn(InputStream in) {
+        int[] bufSz = new int[1];
+        InputStream unwr = unwrapBIS(in, bufSz);
+        return wrapIn(unwr, bufSz[0]);
+    }
+
+    private static InputStream wrapIn(InputStream in, int bufSz) {
+        InterruptibleInputStream res = new InterruptibleInputStream(in);
+        res.setOpTimeout(DEFAULT_SO_TIMEOUT);
+        return new BufferedInputStream(res, bufSz);
     }
 
     /**
@@ -92,16 +118,11 @@ public ByteArray readLineBytes() throws IOException {
             if (Thread.currentThread().isInterrupted()) {
                 break;
             }
-            if (in.available() == 0) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException iex) {
-                    Thread.currentThread().interrupt();
-                    break loop;
-                }
+            try {
+                ch = in.read();
+            } catch (SocketTimeoutException e) {
                 continue;
             }
-            ch = in.read();
             counter++;
             switch (ch) {
                 case -1:
@@ -133,16 +154,11 @@ public ByteArray readLineBytes() throws IOException {
             if (Thread.currentThread().isInterrupted()) {
                 break;
             }
-            if (in.available() == 0) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException iex) {
-                    Thread.currentThread().interrupt();
-                    break loop;
-                }
+            try {
+                ch = in.read();
+            } catch (SocketTimeoutException e) {
                 continue;
             }
-            ch = in.read();
             counter++;
             switch (ch) {
                 case -1:
@@ -170,12 +186,7 @@ public void close() throws IOException {
      * array of bytes.
      */
     public int read(byte[] b) throws IOException {
-        int read = in.read(b);
-        if (read != -1) {
-            Logger.logInput(b, 0, read);
-            counter += read;
-        }
-        return read;
+        return read(b, 0, b.length);
     }
 
     /**
@@ -183,7 +194,7 @@ public int read(byte[] b) throws IOException {
      * bytes
      */
     public int read(byte[] b, int off, int len) throws IOException {
-        int read = in.read(b, off, len);
+        int read = read1(b, off, len);
         if (read != -1) {
             Logger.logInput(b, off, read);
             counter += read;
@@ -191,8 +202,18 @@ public int read(byte[] b, int off, int len) throws 
IOException {
         return read;
     }
 
+    private int read1(byte[] b, int off, int len) throws IOException {
+        for (;;) {
+            try {
+                return in.read(b, off, len);
+            } catch (SocketTimeoutException e) {
+                // no timeout as in original LoggedDataInputStream
+            }
+        }
+    }
+
     public long skip(long n) throws IOException {
-        long skip = super.skip(n);
+        long skip = skip1(n);
         if (skip > 0) {
             Logger.logInput(new String("<skipped " + skip + " 
bytes>").getBytes("utf8")); // NOI18N
             counter += skip;
@@ -200,21 +221,33 @@ public long skip(long n) throws IOException {
         return skip;
     }
 
+    private long skip1(long n) throws IOException {
+        for (;;) {
+            try {
+                return super.skip(n);
+            } catch (SocketTimeoutException e) {
+                // no timeout as in original LoggedDataInputStream
+            }
+        }
+    }
+
     /**
      * Interruptible read.
      * @throws InterruptedIOException on thread interrupt
      */
     public int read() throws IOException {
-        while (in.available() == 0) {
+        int i;
+        for (;;) {
             try {
-                Thread.sleep(100);
-            } catch (InterruptedException iex) {
-                Thread.currentThread().interrupt();
-                throw new InterruptedIOException();
+                i = super.read();
+                break;
+            } catch (SocketTimeoutException e) {
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedIOException();
+                }
             }
         }
 
-        int i = super.read();
         if (i != -1) {
             Logger.logInput((char) i);
             counter++;
@@ -233,4 +266,45 @@ public void setUnderlyingStream(InputStream is) {
     public long getCounter() {
         return counter;
     }
+
+    /**
+     * Get the underlying stream from the {@link BufferedInputStream} returned
+     * by {@link Process#getInputStream()} to allow buffering in some other
+     * place.
+     */
+    public static InputStream unwrapBIS(InputStream in, int[] bufSz) {
+        bufSz[0] = 8192;
+        // do not unwrap BufferedInputStream subclasses
+        if (in != null && BufferedInputStream.class.equals(in.getClass())
+                && FLD_IN != null) {
+            try {
+                byte[] buf = ((byte[]) FLD_BUF.get(in));
+                if (buf != null) {
+                    bufSz[0] = buf.length;
+                }
+                return (InputStream) FLD_IN.get(in);
+            } catch (Exception e) {
+                // can't happen. Fallthrough
+            }
+        }
+        return in;
+    }
+
+    private static final Field FLD_IN;
+    private static final Field FLD_BUF;
+
+    static {
+        FLD_IN = getField(FilterInputStream.class, "in");
+        FLD_BUF = getField(BufferedInputStream.class, "buf");
+    }
+
+    private static Field getField(Class clazz, String name) {
+        try {
+            Field f = FilterInputStream.class.getDeclaredField(name);
+            f.setAccessible(true);
+            return f;
+        } catch (Throwable e) {
+            return null;
+        }
+    }
 }
-- 
2.16.1



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

For further information about the NetBeans mailing lists, visit:
https://cwiki.apache.org/confluence/display/NETBEANS/Mailing+lists

Reply via email to