The attachment contains the latest patch with two (embarrassing stupid) fixes.
The good news is that the few testers did have good results including little performance gains. The bad news is that many (all?) streams do not include EOF in the available() so that read(byte[], int, int) keeps returning 0. Any opinions on that?
Index: BufferedInputStream.java
===================================================================
--- BufferedInputStream.java (revision 242)
+++ BufferedInputStream.java (working copy)
@@ -143,28 +143,6 @@
protected int marklimit;
/**
- * Check to make sure that underlying input stream has not been
- * nulled out due to close; if not return it;
- */
- private InputStream getInIfOpen() throws IOException {
- InputStream input = in;
- if (input == null)
- throw new IOException("Stream closed");
- return input;
- }
-
- /**
- * Check to make sure that buffer has not been nulled out due to
- * close; if not return it;
- */
- private byte[] getBufIfOpen() throws IOException {
- byte[] buffer = buf;
- if (buffer == null)
- throw new IOException("Stream closed");
- return buffer;
- }
-
- /**
* Creates a <code>BufferedInputStream</code>
* and saves its argument, the input stream
* <code>in</code>, for later use. An internal
@@ -197,45 +175,43 @@
}
/**
- * Fills the buffer with more data, taking into account
- * shuffling and other tricks for dealing with marks.
- * Assumes that it is being called by a synchronized method.
- * This method also assumes that all data has already been read in,
- * hence pos > count.
+ * Fills the empty buffer without blocking.
+ * Takes care of the marks too.
+ * @return the nuber of new bytes
*/
- private void fill() throws IOException {
- byte[] buffer = getBufIfOpen();
- if (markpos < 0)
- pos = 0; /* no mark: throw away the buffer */
- else if (pos >= buffer.length) /* no room left in buffer */
- if (markpos > 0) { /* can throw away early part of the buffer */
- int sz = pos - markpos;
- System.arraycopy(buffer, markpos, buffer, 0, sz);
- pos = sz;
- markpos = 0;
- } else if (buffer.length >= marklimit) {
- markpos = -1; /* buffer got too big, invalidate mark */
- pos = 0; /* drop buffer contents */
- } else { /* grow buffer */
- int nsz = pos * 2;
- if (nsz > marklimit)
- nsz = marklimit;
- byte nbuf[] = new byte[nsz];
- System.arraycopy(buffer, 0, nbuf, 0, pos);
- if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
- // Can't replace buf if there was an async close.
- // Note: This would need to be changed if fill()
- // is ever made accessible to multiple threads.
- // But for now, the only way CAS can fail is via close.
- // assert buf == null;
- throw new IOException("Stream closed");
- }
- buffer = nbuf;
- }
+ private int fill() throws IOException {
+ assert pos >= count; // empty buffer
+ if (markpos < 0) // no mark: discard buffer
+ pos = 0;
+ else if (markpos > 0) { // move data from markpos back to buf[0]
+ int readAheadSize = pos - markpos;
+ System.arraycopy(buf, markpos, buf, 0, readAheadSize);
+ pos = readAheadSize;
+ markpos = 0;
+ } else {
+ assert markpos == 0;
+ if (buf.length < marklimit) {
+ // double the size of buf until marklimit fits:
+ int newSize = pos * 2;
+ if (newSize > marklimit)
+ newSize = marklimit;
+ byte[] newBuf = new byte[newSize];
+ System.arraycopy(buf, 0, newBuf, 0, pos);
+ buf = newBuf;
+ }
+ }
+
count = pos;
- int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
- if (n > 0)
- count = n + pos;
+ int read = buf.length - pos;
+ int available = in.available();
+ if (read > available)
+ read = available;
+ if (read > 0) {
+ read = in.read(buf, pos, read);
+ if (read > 0)
+ count += read;
+ }
+ return read;
}
/**
@@ -251,36 +227,43 @@
* @see java.io.FilterInputStream#in
*/
public synchronized int read() throws IOException {
- if (pos >= count) {
- fill();
- if (pos >= count)
- return -1;
- }
- return getBufIfOpen()[pos++] & 0xff;
+ verifyNotClose();
+ if (pos == count) { // empty buffer
+ int loaded = fill();
+ if (loaded < 0)
+ return -1;
+ if (loaded == 0) // block
+ return in.read();
+ }
+ return buf[pos++] & 0xFF;
}
/**
- * Read characters into a portion of an array, reading from the underlying
- * stream at most once if necessary.
+ * Reads up to [EMAIL PROTECTED] byte.length} bytes of data from this input stream into
+ * an array of bytes. This method blocks until some input is available.
+ *
+ * @throws NullPointerException if b is [EMAIL PROTECTED] null}
+ * @throws IOException if this input stream has been closed by invoking its
+ * [EMAIL PROTECTED] #close()} method, or an I/O error occurs.
+ * @see InputStream#read(byte[])
*/
- private int read1(byte[] b, int off, int len) throws IOException {
- int avail = count - pos;
- if (avail <= 0) {
- /* If the requested length is at least as large as the buffer, and
- if there is no mark/reset activity, do not bother to copy the
- bytes into the local buffer. In this way buffered streams will
- cascade harmlessly. */
- if (len >= getBufIfOpen().length && markpos < 0) {
- return getInIfOpen().read(b, off, len);
- }
- fill();
- avail = count - pos;
- if (avail <= 0) return -1;
- }
- int cnt = (avail < len) ? avail : len;
- System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
- pos += cnt;
- return cnt;
+ public int read(byte[] b) throws IOException {
+ int needed = b.length;
+ if (needed == 0)
+ return 0;
+
+ int done = read(b, 0, needed);
+ if (done < 0)
+ return -1;
+
+ while (done < needed) {
+ int next = read();
+ if (next < 0)
+ break;
+ b[done++] = (byte) next;
+ }
+
+ return done == 0 ? -1 : done;
}
/**
@@ -323,80 +306,99 @@
public synchronized int read(byte b[], int off, int len)
throws IOException
{
- getBufIfOpen(); // Check for closed stream
- if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
+ // NullPointerException if b is null:
+ if (off < 0 || len < 0 || (b.length - off < len))
+ throw new IndexOutOfBoundsException();
+ verifyNotClose();
+ if (len == 0)
return 0;
+ int avail = count - pos;
+ assert avail >= 0; // see javadoc count & pos
+
+ if (avail >= len) {
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
}
- int n = 0;
- for (;;) {
- int nread = read1(b, off + n, len - n);
- if (nread <= 0)
- return (n == 0) ? nread : n;
- n += nread;
- if (n >= len)
- return n;
- // if not closed but no bytes available, return
- InputStream input = in;
- if (input != null && input.available() <= 0)
- return n;
+ if (avail != 0) {
+ System.arraycopy(buf, pos, b, off, avail);
+ assert pos + avail == count;
+ pos = count;
}
+ int done = avail;
+
+ do {
+ int loaded = fill();
+ if (loaded <= 0)
+ return done == 0 ? loaded : done;
+ int read = len - done;
+ if (read > loaded)
+ read = loaded;
+ assert read > 0;
+ System.arraycopy(buf, pos, b, off + done, read);
+ pos += read;
+ done += read;
+ } while (done != len);
+ return len;
}
/**
- * See the general contract of the <code>skip</code>
- * method of <code>InputStream</code>.
- *
- * @exception IOException if the stream does not support seek,
- * or if this input stream has been closed by
- * invoking its [EMAIL PROTECTED] #close()} method, or an
- * I/O error occurs.
+ * Skips over and discards [EMAIL PROTECTED] n} bytes of data from this input stream.
+ * This implementation never skips more bytes than [EMAIL PROTECTED] #available()}.
+ * @see InputStream#skip(long)
*/
public synchronized long skip(long n) throws IOException {
- getBufIfOpen(); // Check for closed stream
- if (n <= 0) {
- return 0;
- }
- long avail = count - pos;
-
- if (avail <= 0) {
- // If no mark position set then don't keep in buffer
- if (markpos <0)
- return getInIfOpen().skip(n);
-
- // Fill in buffer to save bytes for reset
- fill();
- avail = count - pos;
- if (avail <= 0)
- return 0;
+ verifyNotClose();
+ if (n <= 0)
+ return 0;
+ long avail = count - pos;
+ assert avail >= 0; // see javadoc count & pos
+
+ if (avail >= n) {
+ pos += n;
+ return n;
}
-
- long skipped = (avail < n) ? avail : n;
- pos += skipped;
+
+ // IOException if in does not support seek:
+ long skipped = in.skip(n - avail);
+ if (skipped < 0) // some streams do :(
+ skipped = 0;
+
+ skipped += avail;
+ assert pos + avail == count;
+ pos = count;
+
return skipped;
}
/**
* Returns an estimate of the number of bytes that can be read (or
* skipped over) from this input stream without blocking by the next
- * invocation of a method for this input stream. The next invocation might be
- * the same thread or another thread. A single read or skip of this
+ * invocation of a method for this input stream. The next invocation might
+ * be the same thread or another thread. A single read or skip of this
* many bytes will not block, but may read or skip fewer bytes.
* <p>
- * This method returns the sum of the number of bytes remaining to be read in
- * the buffer (<code>count - pos</code>) and the result of calling the
- * [EMAIL PROTECTED] java.io.FilterInputStream#in in}.available().
+ * This method returns the sum of the number of bytes remaining to be read
+ * in the buffer (<code>count - pos</code>) and the result of calling
+ * the [EMAIL PROTECTED] java.io.FilterInputStream#in in}.available().
*
- * @return an estimate of the number of bytes that can be read (or skipped
- * over) from this input stream without blocking.
+ * @return an estimate of the number of bytes that can be read (or
+ * skipped over) from this input stream without blocking.
* @exception IOException if this input stream has been closed by
* invoking its [EMAIL PROTECTED] #close()} method,
* or an I/O error occurs.
*/
public synchronized int available() throws IOException {
- return getInIfOpen().available() + (count - pos);
+ verifyNotClose();
+ int inBuffer = count - pos;
+ int inStream = in.available();
+ if (inStream < 0) // some streams do :(
+ inStream = 0;
+ int available = inBuffer + inStream;
+ if (available < 0) // overflow
+ available = Integer.MAX_VALUE;
+ return available;
}
/**
@@ -429,10 +431,10 @@
* @see java.io.BufferedInputStream#mark(int)
*/
public synchronized void reset() throws IOException {
- getBufIfOpen(); // Cause exception if closed
- if (markpos < 0)
- throw new IOException("Resetting to invalid mark");
- pos = markpos;
+ verifyNotClose();
+ if (markpos < 0)
+ throw new IOException("Resetting to invalid mark");
+ pos = markpos;
}
/**
@@ -472,4 +474,14 @@
// Else retry in case a new buf was CASed in fill()
}
}
+
+ /**
+ * Verifies that the [EMAIL PROTECTED] #close()} method has not been called yet.
+ * @throws IOException if closed.
+ */
+ private void verifyNotClose() throws IOException {
+ if (buf == null || in == null)
+ throw new IOException("Stream closed");
+ }
+
}
signature.asc
Description: This is a digitally signed message part.
