Attached and sending!

On 24/10/2016 13:48, Pavel Rappo wrote:
Could you please send a new email on this list with the source attached as a
text file?

On 23 Oct 2016, at 19:14, Brunoais <brunoa...@gmail.com> wrote:

Here's my poc/prototype:
http://pastebin.com/WRpYWDJF

I've implemented the bare minimum of the class that follows the same contract 
of BufferedReader while signaling all issues I think it may have or has in 
comments.
I also wrote some javadoc to help guiding through the class.

I could have used more fields from BufferedReader but the names were so 
minimalistic that were confusing me. I intent to change them before sending 
this to openJDK.

One of the major problems this has is long overflowing. It is major because it 
is hidden, it will be extremely rare and it takes a really long time to 
reproduce. There are different ways of dealing with it. From just documenting 
to actually making code that works with it.

I built a simple test code for it to have some ideas about performance and 
correctness.

http://pastebin.com/eh6LFgwT

This doesn't do a through test if it is actually working correctly but I see no 
reason for it not working correctly after fixing the 2 bugs that test found.

I'll also leave here some conclusions about speed and resource consumption I 
found.

I made tests with default buffer sizes, 5000B  15_000B and 500_000B. I noticed 
that, with my hardware, with the 1 530 000 000B file, I was getting around:

In all buffers and fake work: 10~15s speed improvement ( from 90% HDD speed to 
100% HDD speed)
In all buffers and no fake work: 1~2s speed improvement ( from 90% HDD speed to 
100% HDD speed)

Changing the buffer size was giving different reading speeds but both were 
quite equal in how much they would change when changing the buffer size.
Finally, I could always confirm that I/O was always the slowest thing while 
this code was running.

For the ones wondering about the file size; it is both to avoid OS cache and to 
make the reading at the main use-case these objects are for (large streams of 
bytes).

@Pavel, are you open for discussion now ;)? Need anything else?

On 21/10/2016 19:21, Pavel Rappo wrote:
Just to append to my previous email. BufferedReader wraps any Reader out there.
Not specifically FileReader. While you're talking about the case of effective
reading from a file.

I guess there's one existing possibility to provide exactly what you need (as I
understand it) under this method:

/**
  * Opens a file for reading, returning a {@code BufferedReader} to read text
  * from the file in an efficient manner...
    ...
  */
java.nio.file.Files#newBufferedReader(java.nio.file.Path)

It can return _anything_ as long as it is a BufferedReader. We can do it, but it
needs to be investigated not only for your favorite OS but for other OSes as
well. Feel free to prototype this and we can discuss it on the list later.

Thanks,
-Pavel

On 21 Oct 2016, at 18:56, Brunoais <brunoa...@gmail.com> wrote:

Pavel is right.

In reality, I was expecting such BufferedReader to use only a single buffer and 
have that Buffer being filled asynchronously, not in a different Thread.
Additionally, I don't have the intention of having a larger buffer than before 
unless stated through the API (the constructor).

In my idea, internally, it is supposed to use 
java.nio.channels.AsynchronousFileChannel or equivalent.

It does not prevent having two buffers and I do not intent to change 
BufferedReader itself. I'd do an BufferedAsyncReader of sorts (any name 
suggestion is welcome as I'm an awful namer).


On 21/10/2016 18:38, Roger Riggs wrote:
Hi Pavel,

I think Brunoais asking for a double buffering scheme in which the 
implementation of
BufferReader fills (a second buffer) in parallel with the application reading 
from the 1st buffer
and managing the swaps and async reads transparently.
It would not change the API but would change the interactions between the 
buffered reader
and the underlying stream.  It would also increase memory requirements and 
processing
by introducing or using a separate thread and the necessary synchronization.

Though I think the formal interface semantics could be maintained, I have doubts
about compatibility and its unintended consequences on existing subclasses,
applications and libraries.

$.02, Roger

On 10/21/16 1:22 PM, Pavel Rappo wrote:
Off the top of my head, I would say it's not possible to change the design of an
_extensible_ type that has been out there for 20 or so years. All these I/O
streams from java.io were designed for simple synchronous use case.

It's not that their design is flawed in some way, it's that they doesn't seem to
suit your needs. Have you considered using 
java.nio.channels.AsynchronousFileChannel
in your applications?

-Pavel

On 21 Oct 2016, at 17:08, Brunoais <brunoa...@gmail.com> wrote:

Any feedback on this? I'm really interested in implementing such 
BufferedReader/BufferedStreamReader to allow speeding up my applications 
without having to think in an asynchronous way or multi-threading while 
programming with it.

That's why I'm asking this here.


On 13/10/2016 14:45, Brunoais wrote:
Hi,

I looked at BufferedReader source code for java 9 long with the source code of 
the channels/streams used. I noticed that, like in java 7, BufferedReader does 
not use an Async API to load data from files, instead, the data loading is all 
done synchronously even when the OS allows requesting a file to be read and 
getting a warning later when the file is effectively read.

Why Is BufferedReader not async while providing a sync API?



package pocs.java;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;

public class BufferedNonBlockStream extends BufferedInputStream {

        private static final int DEFAULT_BUFFER_SIZE = 8_192;
        
        /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

        private static final int MIN_READ_BUFFER_SIZE = 10;
        /**
     * The maximum size of the read buffer if set automatically.
     * Unless explicitly told, there's no need to starve the VM out of memory
     */
        private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
        
        /**
     * The virtual index of the next position where data will be read.
     * Every operation requires executing a mathematical mod ({@code %}) 
operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     */
    protected long readPos;
    
    /**
     * The virtual index of the next position where data will be written
     * Every operation requires executing a mathematical mod ({@code %}) 
operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     * 
     * At every time, {@code readPos < writePos %} must be true
     * 
     */
    // Note to reviewers: What to do when long overflows? It will happen but 
does a documented warning suffice or... do I need to work with the overflows?
    protected long writePos;
        
        ReadableByteChannel inputChannel;
        ByteBuffer readBuffer;
        
        boolean isClosed;
        boolean eof;

        public BufferedNonBlockStream(InputStream inputStream) {
                super(inputStream);
        }
        
        public BufferedNonBlockStream(ReadableByteChannel inputChannel) {
                this(inputChannel, DEFAULT_BUFFER_SIZE);
        }
        
        public BufferedNonBlockStream(ReadableByteChannel inputChannel, int 
bufferSize) {
                this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
        }
        
        public BufferedNonBlockStream(ReadableByteChannel inputChannel, int 
bufferSize, int readBufferSize) {
                super(null, bufferSize);
                Objects.requireNonNull(inputChannel, "The byte channel must not 
be null");
                this.inputChannel = inputChannel;
                
                if(readBufferSize < 1){
                        throw new IllegalArgumentException("Read buffer must be 
at least 1 byte");
                }
                
                if(readBufferSize > bufferSize){
                        throw new IllegalArgumentException("ReadBufferSize must 
be smaller than bufferSize");
                }
                
                // The read buffer must not be larger than the internal buffer.
                readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
                this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
                isClosed = false;
        }
        
        int readSpaceAvailable(){
                // This can be an int because buf size can never be larger than 
the int
                return (int) (writePos - readPos);
        }
        int writeSpaceAvailable(){
                return buf.length - readSpaceAvailable();
        }
        
        void fill() throws IOException {
                fill(false);
        }
    void blockingFill(boolean forced) throws IOException {
                fill(forced);
                while(readPos == writePos){
                        try {
                                Thread.sleep(100);
                        } catch (InterruptedException e) {
                                // An interrupt may mean more data is available
                        }
                        fill(forced);
        }
    }
    
    synchronized void fill(boolean forced) throws IOException {
        isClosed = !inputChannel.isOpen();
        
        eof = inputChannel.read(readBuffer) == -1;
        
        // For the poc, markers are not coded
        
        long freeBufSpace = writeSpaceAvailable();
        
        if(     (forced && readBuffer.position() > 0) ||
                // It's no use reading a very small quantity from the buffer
            // Please review that.
                (freeBufSpace > readBuffer.capacity() / 2 && freeBufSpace >= 
readBuffer.position())
        ){
                readBuffer.flip();
            // Ints are faster and an array can only be defined using an int
            int writeBufferFrom = (int) (writePos % buf.length);
            int canReadAmount =  Math.min((int) freeBufSpace, 
readBuffer.remaining());
            int canWriteUntil = (writeBufferFrom + canReadAmount) % buf.length;
            
                // This can only be done like this because ReadBufferSize is 
smaller than bufferSize
            // and overflows are ignored
                if(canWriteUntil < writeBufferFrom){
                        // Read in 2 parts
                        readBuffer.get(buf, writeBufferFrom, buf.length - 
writeBufferFrom);
                        readBuffer.get(buf, 0, canWriteUntil);
                } else {
                        readBuffer.get(buf, writeBufferFrom, canWriteUntil - 
writeBufferFrom);
                }

                writePos += canReadAmount;
                // Reset the buffer for more reading
                readBuffer.clear();
        }
        
    }
        
        @Override
        public int read() throws IOException {
                if(inputChannel == null){
                        return super.read();                    
                }
                fill(false);
                if(readPos == writePos){
                        // Get anything that is in the buffer
                        blockingFill(true);
                }
                
                return buf[(int)(readPos++ % buf.length)] & 0xff;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
                if(inputChannel == null){
                        return super.read(b, off, len);
                }
                // Always fill. Even when rejecting
                fill();
                if(eof && readSpaceAvailable() == 0){
                        // Buffer emptied and the stream ended
                        return -1;
                }
                if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
                
                int bytesRead = 0;
                
                // For a ReadableChannel, assume that it has always read as 
much as it could
                if(readSpaceAvailable() == 0){
                        fill(true);
                        if(readSpaceAvailable() == 0){
                                return eof ? -1 : 0;
                        }
                }

                // Note for reviewing: Some optimizations that take some time 
were not made here.
                // E.g.
                // -> If b consumes the whole buff, copy directly from 
readBuffer to this buff
                // -> As long as reading buffs are large enough (requires 
benchmarking), 
                //        not do use the buf array at all (late initialization 
too)

        int readBufferFrom = (int) (readPos % buf.length);
        int canReadAmount =  Math.min((int) readSpaceAvailable(), len - off);
        int canReadBufferUntil = (readBufferFrom + canReadAmount) % buf.length;
        
                if(canReadBufferUntil < readBufferFrom){
                // For reviewer: Should it always read once? Or should it read 
twice?
                        System.arraycopy(buf, readBufferFrom, b, off, 
buf.length - readBufferFrom);
                        int ndStartAt = off + (buf.length - readBufferFrom);
                        // This if should never evaluate "true". It's here just 
to make sure and make breakpoints
                        if(off - ndStartAt >= len){
                                readPos = canReadBufferUntil;
                                return off - ndStartAt; 
                        }
                        System.arraycopy(buf, 0, b, ndStartAt, 
canReadBufferUntil);
                        readPos += (buf.length - readBufferFrom) + 
canReadBufferUntil;
                        return (buf.length - readBufferFrom) + 
canReadBufferUntil;
        } else {
                        System.arraycopy(buf, readBufferFrom, b, off, 
canReadBufferUntil - readBufferFrom);
                        readPos += canReadBufferUntil - readBufferFrom;
                        return canReadBufferUntil - readBufferFrom;
        }
                
                
        }

        @Override
        public long skip(long arg0) throws IOException {
                if(inputChannel == null){
                        return super.skip(arg0);
                }
                
                // There's no skip in this poc
                // For the real implementation, I'd do (while I didn't skip 
enough):
                // I'd just "jump" the readPos accordingly (while keeping 
readPos <= writePos)
                // Then 
                // reposition (.position(int)) in buffer
                // OR
                // clear the buffer
                // Then
                // If SeekableChannel -> use (sord of) .position(.position()+ 
skipLeft)
                // else -> do not seek.
                
                
                throw new UnsupportedOperationException();
                
        }

        @Override
        public int available() throws IOException {
                if(inputChannel == null){
                        return super.available();
                }
                fill();
                // For now, just return what I am sure I have
                return readSpaceAvailable();
                
        }

        @Override
        public void mark(int arg0) {
                if(inputChannel == null){
                        super.mark(arg0);
                }
                
                // This is not hard to do but it requires quite some time!
                // I'll wait first if the poc passes without this method
        }

        @Override
        public void reset() throws IOException {
                if(inputChannel == null){
                        super.reset();
                }
                throw new IOException("The mark was lost");
        }

        @Override
        public boolean markSupported() {
                if(inputChannel == null){
                        return super.markSupported();
                }
                // false for now, at least.
                return false;
        }

        @Override
        public void close() throws IOException {
                if(inputChannel != null){
                        inputChannel.close();
                }
                super.close();
        }

}
package pocs.java;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.zip.CRC32;

public class Tests {
        
        private static final int PROGRAM_BUFFER = 200;
        private static final int READ_BUFFER = 500_000;
        private static final boolean EXECUTE_WORK = true;

        static void buildTestFile() throws Exception{

                BufferedOutputStream outputing = new BufferedOutputStream(new 
FileOutputStream("TestFile"), 30000);
                
                for (int i = 0; i < 6_000_000; i++) {
                        for (int j = 0; j < 255; j++) {
                                outputing.write(j);
                        }
                }
                
                outputing.close();
        }
        
        static void theRead(BufferedInputStream inputing) throws Exception{
                byte[] myBuf = new byte[PROGRAM_BUFFER];
                int readAmount = 0;
                byte expectingByte = 0x00;
                int strikes = 0;
                
                int readCount = 0;
                
                while((readAmount = inputing.read(myBuf)) != -1){
                        readCount++;
                        for (int i = 0; i < readAmount; i++) {
                                if(expectingByte == -1){
                                        expectingByte = 0;
                                }
                                if(expectingByte != myBuf[i]){
                                        System.out.println("For byte " + (int) 
myBuf[i] + " expected " + (int) expectingByte );
                                        if(strikes++ > 50){
                                                return;
                                        }
                                }
                                expectingByte += 1;
                        }
                        if(EXECUTE_WORK){
                                // doing work
                                for (int i = 0; i < 30; i++) {
                                        new CRC32().update(myBuf);      
                                }
                        }
                }
        }
        
        static void normalRead() throws Exception {

                try(BufferedInputStream inputing = new BufferedInputStream(new 
FileInputStream("TestFile"), READ_BUFFER);){
                        theRead(inputing);
                }
        }
        

        static void myRead() throws Exception {

                try(BufferedInputStream inputing = new BufferedNonBlockStream(
                                        FileChannel.open(new 
File("TestFile").toPath()), READ_BUFFER
                                );
                        ){
                        theRead(inputing);
                }
        }
        
        public static void main(String[] args) throws Exception {

//              buildTestFile();
                long start = System.currentTimeMillis();
//              normalRead();
                myRead();
                System.out.println("Time! " + (System.currentTimeMillis() - 
start));
                
                
        }
}

Reply via email to