Here's my try on going async.

On my tests on my local windows 10 machine (I don't have access to the linux one without a VM ATM) , now with 1GB file, I noticed:

~2s of speed improvement from BufferedInputStream to BufferedNonBlockStream when BufferedNonBlockStream is at its most advantageous state (with CPU work between reads). Otherwise, it has ~0.3s speed improvement; maybe less.

~0.8s of speed improvement from BufferedNonBlockStream to BufferedAsyncStream when BufferedNonBlockStream/BufferedAsyncStream is at its most advantageous state. Otherwise, ~0 speed improvement.

I noticed: Until you process as fast as you read, both BufferedNonBlockStream and BufferedAsyncStream gain advantage towards BufferedInputStream. From the point as you take longer to process than to I/O, BufferedNonBlockStream and BufferedAsyncStream tend to keep the advantage.

If the file is in cache, BufferedInputStream takes ~1.6-1.9s to read the file, BufferedNonBlockStream takes a steady ~2.05-2.1s to read the file and BufferedAsyncStream ~1.2s.

BufferedNonBlockStream and BufferedAsyncStream win more when given more buffer memory than BufferedInputStream but only until a certain limit. On my hardware, the best speed I got was with 655360B for the new ones. Any more than that was not producing any visible results. I guess it is due to the speed data was processing for the test.

On 28/10/2016 09:16, Brunoais wrote:
I'll try going back to a previous version I worked on which used the java7's AsynchronousFileChannel and work from there. My small research shows it can also work with AsynchronousFileChannel mostly without changes.

For now, 1 question:
Is Thread.sleep() a possible way of dealing the block requirements of read()? Do I need to use LockSupport.park() or something like that?

I'll call back here when it is done.


On 27/10/2016 22:09, David Holmes wrote:
You might try discussing on net-dev rather than core-libs-dev, to get additional historical info related to the io and nio file APIs.

David

On 28/10/2016 5:08 AM, Brunoais wrote:
You are right. Even in windows it does not set the flags for async
reads. It seems like it is windows itself that does the decision to
buffer the contents based on its own heuristics.

But... Why? Why won't it be? Why is there no API for it? How am I
getting 100% HDD use and faster times when I fake work to delay getting
more data and I only have a fluctuating 60-90% (always going up and
down) when I use an InputStream?
Is it related to how both classes cache and how frequently and how much
each one asks for data?

I really would prefer not having to read the source code because it
takes a real long time T.T.

I end up reinstating... And wondering...

Why doesn't java provide a single-threaded non-block API for file reads
for all OS that support it? I simply cannot find that information no
matter how much I search on google, bing, duck duck go... Can any of you
point me to whomever knows?

On 27/10/2016 14:11, Vitaly Davidovich wrote:
I don't know about Windows specifically, but generally file systems
across major OS's will implement readahead in their IO scheduler when
they detect sequential scans.

On Linux, you can also strace your test to confirm which syscalls are
emitted (you should be seeing plain read()'s there, with
FileInputStream and FileChannel).

On Thu, Oct 27, 2016 at 9:06 AM, Brunoais <brunoa...@gmail.com
<mailto:brunoa...@gmail.com>> wrote:

    Thanks for the heads up.

    I'll try that later. These tests are still useful then. Meanwhile,
    I'll end up also checking how FileChannel queries the OS on
    windows. I'm getting 100% HDD reads... Could it be that the OS
    reads the file ahead on its own?... Anyway, I'll look into it.
    Thanks for the heads up.


    On 27/10/2016 13:53, Vitaly Davidovich wrote:


    On Thu, Oct 27, 2016 at 8:34 AM, Brunoais <brunoa...@gmail.com
    <mailto:brunoa...@gmail.com>> wrote:

        Oh... I see. In that case, it means something is terribly
        wrong. It can be my initial tests, though.

        I'm testing on both linux and windows and I'm getting
        performance gains from using the FileChannel compared to
        using FileInputStream... The tests also make sense based on
        my predictions O_O...

    FileInputStream requires copying native buffers holding the read
    data to the java byte[].  If you're using direct ByteBuffer for
    FileChannel, that whole memcpy is skipped.  Try comparing
    FileChannel with HeapByteBuffer instead.


        On 27/10/2016 11:47, Vitaly Davidovich wrote:


        On Thursday, October 27, 2016, Brunoais <brunoa...@gmail.com
        <mailto:brunoa...@gmail.com>> wrote:

            Did you read the C code?

        I looked at the Linux code in the JDK.

            Have you got any idea how many functions Windows or
            Linux (nearly all flavors) have for the read operation
            towards a file?

        I do.


            I have already done that homework myself. I may not have
            read JVM's source code but I know well that there's
            functions on both Windows and Linux that provide such
            interface I mentioned although they require a slightly
            different treatment (and different constants).

        You should read the JDK (native) source code instead of
        guessing/assuming.  On Linux, it doesn't use aio facilities
        for files.  The kernel io scheduler may issue readahead
        behind the scenes, but there's no nonblocking file io that's
        at the heart of your premise.



            On 27/10/2016 00:06, Vitaly Davidovich wrote:



                On Wednesday, October 26, 2016, Brunoais
                <brunoa...@gmail.com <mailto:brunoa...@gmail.com>>
                wrote:

                    It is actually based on the premise that:

                    1. The first call to
                ReadableByteChannel.read(ByteBuffer) sets the OS
                       buffer size to fill in as the same size as
                ByteBuffer.

                Why do you say that? AFAICT, it issues a read
                syscall and that will block if the data isn't in
                page cache.

                    2. The consecutive calls to
                ReadableByteChannel.read(ByteBuffer)
                    orders
                       the JVM to order the OS to execute memcpy()
                to copy from its memory
                       to the shared memory created at ByteBuffer
                instantiation (in
                    java 8)
                       using Unsafe and then for the JVM to update
                the ByteBuffer fields.

                I think subsequent reads just invoke the same read
                syscall, passing the current file offset maintained
                by the file channel instance.

                    3. The call will not block waiting for I/O and
                it won't take longer
                       than the JNI interface if no new data exists.
                However, it will
                    block
                       waiting for the OS to execute memcpy() to the
                shared memory.

                So why do you think it won't block?


                    Is my premise wrong?

                    If I read correctly, if I don't use a
                DirectBuffer, there would be
                    even another intermediate buffer to copy data to
                before giving it
                    to the "user" which would be useless.

                If you use a HeapByteBuffer, then there's an extra
                copy from the native buffer to the Java buffer.



                    On 26/10/2016 11:57, Pavel Rappo wrote:

                        I believe I see where you coming from.
                Please correct me if
                        I'm wrong.

                        Your implementation is based on the premise
                that a call to
                ReadableByteChannel.read()
                        _initiates_ the operation and returns
                immediately. The OS then
                        continues to fill
                        the buffer while there's a free space in the
                buffer and the
                        channel hasn't encountered EOF.

                        Is that right?

                            On 25 Oct 2016, at 22:16, Brunoais
                <brunoa...@gmail.com>
                            wrote:

                            Thank you for your time. I'll try to
                explain it. I hope I
                            can clear it up.
                            First of it, I made a meaning mistake
                between asynchronous
                            and non-blocking. This implementation
                uses a non-blocking
                            algorithm internally while providing a
                blocking-like
                            algorithm on the surface. It is
                single-threaded and not
                            multi-threaded where one thread fetches
                data and blocks
                            waiting and the other accumulates it and
                provides to
                            whichever wants it.

                            Second of it, I had made a mistake of
                going after
                            BufferedReader instead of going after
                BufferedInputStream.
                            If you want me to go after
                BufferedReader it's ok but I
                            only thought that going after
                BufferedInputStream would be
                            more generically useful than
                BufferedReaderwhen I started
                            the poc.

                            On to my code:
                            Short answers:
                                    • The sleep(int) exists because
                I don't know how
                            to wait until more data exists in the
                buffer which is part
                            of read()'s contract.
                                    • The ByteBuffer gives a buffer
                that is filled by
                            the OS (what I believe Channels do)
                instead of getting
                            data only         by demand (what I
                believe Streams do).
                            Full answers:
                            The blockingFill(boolean) method is a
                method for a busy
                            wait for a fill which is used
                exclusively by the read()
                            method. All other methods use the
                version that does not
                            sleep (fill(boolean)).
                blockingFill(boolean)'s existance like that is only
                            because the read() method must not
                return unless either:

                                    • The stream ended.
                                    • The next byte is ready for
                reading.
                            Additionally, statistically, that while
                loop will rarely
                            evaluate to true as reads are in chunks
                so readPos will be
                            behind writePos most of the time.
                            I have no idea if an interrupt will ever
                happen, to be
                            honest. The main reasons why I'm using a
                sleep is because
                            I didn't want a hog onto the CPU in a
                full thread usage
                            busy wait and because I didn't find any
                way of doing a
                            thread sleep in order to wake up later
                when the buffer
                            managed by native code has more data.
                            The Non-blocking part is managed by the
                buffer the OS
                            keeps filling most if not all the time.
                That buffer is the
                            field

                            ByteBuffer readBuffer
                            That's the gaining part against the
                plain old Buffered
                            classes.


                            Did that make sense to you? Feel free to
                ask anything else
                            you need.

                            On 25/10/2016 20:52, Pavel Rappo wrote:

                                I've skimmed through the code and
                I'm not sure I can
                                see any asynchronicity
                                (you were pointing at the lack of it
                in BufferedReader).
                                And the mechanics of this is very
                puzzling to me, to
                                be honest:
                                     void blockingFill(boolean
                forced) throws
                                IOException {
                 fill(forced);
while (readPos == writePos) {
                                             try {
                 Thread.sleep(100);
                                             } catch
                (InterruptedException e) {
                 // An interrupt may mean more data is
                                available
                                             }
                 fill(forced);
                                         }
                                     }
                                I thought you were suggesting that
                we should utilize
                                the tools which OS provides
                                more efficiently. Instead we have
                something that looks
                                very similarly to a
                                "busy loop" and... also who and when
                is supposed to
                                interrupt Thread.sleep()?
                                Sorry, I'm not following. Could you
                please explain how
                                this is supposed to work?

On 24 Oct 2016, at 15:59, Brunoais
<brunoa...@gmail.com>
                                      wrote:
                                    Attached and sending!
                                    On 24/10/2016 13:48, Pavel Rappo
                wrote:

                                        Could you please send a new
                email on this list
with the source attached as a
                                        text file?

                                            On 23 Oct 2016, at
                19:14, Brunoais
<brunoa...@gmail.com>
                wrote:
                Here's my poc/prototype:

                http://pastebin.com/WRpYWDJF

                                            I've implemented the
                bare minimum of the
                class that follows the same contract of
                BufferedReader while signaling all issues
                                            I think it may have or
                has in comments.
                                            I also wrote some
                javadoc to help guiding
                through the class.
                                            I could have used more
                fields from
                BufferedReader but the names were so
                minimalistic that were confusing me. I
                intent to change them before sending this
                                            to openJDK.
                                            One of the major
                problems this has is long
                overflowing. It is major because it is
                hidden, it will be extremely rare and it
                takes a really long time to reproduce.
                There are different ways of dealing with
                                            it. From just
                documenting to actually
                making code that works with it.
                                            I built a simple test
                code for it to have
                                            some ideas about
                performance and correctness.

                http://pastebin.com/eh6LFgwT

                                            This doesn't do a
                through test if it is
                actually working correctly but I see no
                reason for it not working correctly after
                fixing the 2 bugs that test found.
                                            I'll also leave here
                some conclusions
                about speed and resource consumption I found.
                                            I made tests with
                default buffer sizes,
                5000B 15_000B and 500_000B. I noticed
                that, with my hardware, with the 1 530 000
                                            000B file, I was getting
                around:
                                            In all buffers and fake
                work: 10~15s speed
                improvement ( from 90% HDD speed to 100%
                                            HDD speed)
                                            In all buffers and no
                fake work: 1~2s
                speed improvement ( from 90% HDD speed to
                                            100% HDD speed)
                Changing the buffer size was giving
                different reading speeds but both were
                quite equal in how much they would change
                                            when changing the buffer
                size.
                Finally, I could always confirm that I/O
                                            was always the slowest
                thing while this
                                            code was running.
                                            For the ones wondering
                about the file
                size; it is both to avoid OS cache and to
                                            make the reading at the
                main use-case
                these objects are for (large streams of
                bytes).
                @Pavel, are you open for discussion now
                                            ;)? Need anything else?
                                            On 21/10/2016 19:21,
                Pavel Rappo wrote:

                Just to append to my previous email.
                BufferedReader wraps any Reader out there.
                Not specifically FileReader. While
                you're talking about the case of effective
                reading from a file.
                I guess there's one existing
                possibility to provide exactly what
                you need (as I
                understand it) under this method:
                /**
                  * Opens a file for reading,
                returning a {@code BufferedReader} to
                read text
                  * from the file in an efficient
                manner...
                    ...
                  */

java.nio.file.Files#newBufferedReader(java.nio.file.Path)
                It can return _anything_ as long as it
                is a BufferedReader. We can do it, but it
                needs to be investigated not only for
                your favorite OS but for other OSes as
                well. Feel free to prototype this and
                we can discuss it on the list later.
                Thanks,
                -Pavel

                    On 21 Oct 2016, at 18:56, Brunoais
                    <brunoa...@gmail.com>
                      wrote:
                    Pavel is right.
                    In reality, I was expecting such
                    BufferedReader to use only a
                    single buffer and have that Buffer
                    being filled asynchronously, not
                    in a different Thread.
                    Additionally, I don't have the
                    intention of having a larger
                    buffer than before unless stated
                    through the API (the constructor).
                    In my idea, internally, it is
                    supposed to use
java.nio.channels.AsynchronousFileChannel
                    or equivalent.
                    It does not prevent having two
                    buffers and I do not intent to
                    change BufferedReader itself. I'd
                    do an BufferedAsyncReader of sorts
                    (any name suggestion is welcome as
                    I'm an awful namer).
                    On 21/10/2016 18:38, Roger Riggs
                    wrote:

                        Hi Pavel,
                        I think Brunoais asking for a
                        double buffering scheme in
                        which the implementation of
                        BufferReader fills (a second
                        buffer) in parallel with the
                        application reading from the
                        1st buffer
                        and managing the swaps and
                        async reads transparently.
                        It would not change the API
                        but would change the
                        interactions between the
                        buffered reader
                        and the underlying stream.  It
                        would also increase memory
                        requirements and processing
                        by introducing or using a
                        separate thread and the
                        necessary synchronization.
                        Though I think the formal
                        interface semantics could be
                        maintained, I have doubts
                        about compatibility and its
                        unintended consequences on
                        existing subclasses,
                        applications and libraries.
                        $.02, Roger
                        On 10/21/16 1:22 PM, Pavel
                        Rappo wrote:

                            Off the top of my head, I
                            would say it's not
                            possible to change the
                            design of an
                            _extensible_ type that has
                            been out there for 20 or
                            so years. All these I/O
                            streams from java.io <http://java.io>
                            <http://java.io> were
                            designed for simple
                            synchronous use case.
                            It's not that their design
                            is flawed in some way,
                            it's that they doesn't seem to
                            suit your needs. Have you
                            considered using
java.nio.channels.AsynchronousFileChannel
                            in your applications?
                            -Pavel

                                On 21 Oct 2016, at
                                17:08, Brunoais
<brunoa...@gmail.com>
                                  wrote:
                                Any feedback on this?
                                I'm really interested
                                in implementing such
                BufferedReader/BufferedStreamReader
                                to allow speeding up
                                my applications
                                without having to
                                think in an
                                asynchronous way or
                                multi-threading while
                                programming with it.
                                That's why I'm asking
                                this here.
                                On 13/10/2016 14:45,
                                Brunoais wrote:

                                    Hi,
                                    I looked at
                BufferedReader
                                    source code for
                                    java 9 long with
                                    the source code of
                                    the
                channels/streams
                                    used. I noticed
                                    that, like in java
                                    7, BufferedReader
                                    does not use an
                                    Async API to load
                                    data from files,
                                    instead, the data
                                    loading is all
                                    done synchronously
                                    even when the OS
                                    allows requesting
                                    a file to be read
                                    and getting a
                                    warning later when
                                    the file is
                                    effectively read.
                                    Why Is
                BufferedReader not
                                    async while
                                    providing a sync API?

<BufferedNonBlockStream.java><Tests.java>





                --                 Sent from my phone




        --         Sent from my phone








package org.sample.BufferedNon_BlockIO;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.locks.LockSupport;

public class BufferedAsyncStream extends BufferedInputStream {

        private static final int DEFAULT_BUFFER_SIZE = 8_192;
        
        /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

        private static final int MIN_READ_BUFFER_SIZE = 10;
        /**
     * The maximum size of the read buffer if set automatically.
     * Unless explicitly told, there's no need to starve the VM out of memory
     */
        private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
        
        /**
     * The virtual index of the next position where data will be read.
     * Every operation requires executing a mathematical mod ({@code %}) 
operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     */
    protected long readPos;
    
    /**
     * The virtual index of the next position where data will be written
     * Every operation requires executing a mathematical mod ({@code %}) 
operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     * 
     * At every time, {@code readPos <= writePos %} must be true
     * 
     */
    // Note to reviewers: What to do when long overflows? It will happen but 
does a documented warning suffice or... do I need to work with the overflows?
    protected long writePos;
        
        ChannelReader readFrom;
        ByteBuffer readBuffer;
        FileReadCallback<Object> readCallback;
        IOException nextException;
        
        transient boolean isReading;
        
        Thread parkingThread;
        
        boolean isClosed;
        boolean eof;

        interface ChannelReader extends Closeable{
                void read(ByteBuffer dst);
        }
        class AsyncFileChannelReader implements ChannelReader, 
CompletionHandler<Integer, ByteBuffer>{
                AsynchronousFileChannel inChannel;
                long currentPosition;
                AsyncFileChannelReader(AsynchronousFileChannel inChannel) {
                        this.inChannel = inChannel;
                        currentPosition = 0;
                }

                @Override
                public void read(ByteBuffer dst) {
                        isReading = true;
                        inChannel.read(dst, currentPosition, dst, this);
                }
                @Override
                public void close() throws IOException{
                        inChannel.close();
                }
                
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                        if(result > 0){
                                currentPosition += result;
                        }
                        readCallback.completed(result, attachment);
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                        readCallback.failed(exc, attachment);
                }

        }
        
        class AsyncByteChannelReader implements ChannelReader{
                AsynchronousByteChannel inChannel;
                
                AsyncByteChannelReader(AsynchronousByteChannel inChannel) {
                        this.inChannel = inChannel;
                }
                
                @Override
                public void read(ByteBuffer dst) {
                        isReading = true;
                        inChannel.read(dst, dst, readCallback);
                }
                @Override
                public void close() throws IOException {
                        inChannel.close();
                }
                
        }

        class FileReadCallback<A> implements CompletionHandler<Integer, A>{

                @Override
                public void completed(Integer resultI, A attachment) {
                        // unbox
                        int result = resultI;
                        if(result == -1){
                                eof = true;
                        }
                        
                        long freeBufSpace = writeSpaceAvailable();
                        
                        if(freeBufSpace < readBuffer.position()){
                                // No enough space in buf to copy from 
readBuffer.
                                // Just send it back to fill even more
                                readFrom.read(readBuffer);
                                return;
                        }
                        if(readBuffer.position() > readBuffer.capacity() / 2 || 
(eof && readBuffer.position() > 0)){
                                readBuffer.flip();
                                // Ints are faster and an array can only be 
defined using an int
                                int writeBufferFrom = (int) (writePos % 
buf.length);
                    int canReadAmount =  Math.min((int) freeBufSpace, 
readBuffer.remaining());
                    int canWriteUntil = (writeBufferFrom + canReadAmount) % 
buf.length;
                                
                    // This can only be done like this because ReadBufferSize 
is smaller than bufferSize
                    // and overflows are ignored
                        if(canWriteUntil < writeBufferFrom){
                                // Read in 2 parts
                                readBuffer.get(buf, writeBufferFrom, buf.length 
- writeBufferFrom);
                                readBuffer.get(buf, 0, canWriteUntil);
                        } else {
                                readBuffer.get(buf, writeBufferFrom, 
canWriteUntil - writeBufferFrom);
                        }

                        writePos += canReadAmount;
                        // Reset the buffer for more reading
                        readBuffer.clear();
                        
                        LockSupport.unpark(parkingThread);
                        }
                        if(eof){
                                isReading = false;
                                return;
                        }
                        readFrom.read(readBuffer);
                }

                @Override
                public void failed(Throwable exc, A attachment) {
                        if(exc instanceof IOException){
                                nextException = (IOException) exc;
                        } else {
                                nextException = new IOException("Failed to read 
more data to the buffer", exc);
                        }
                        isReading = false;
                }
                
        }
        
                
        public BufferedAsyncStream(InputStream inputStream) {
                super(inputStream);
        }
        
        public BufferedAsyncStream(AsynchronousFileChannel inputChannel) {
                this(inputChannel, DEFAULT_BUFFER_SIZE);
        }
        
        public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int 
bufferSize) {
                this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
        }
        
        public BufferedAsyncStream(AsynchronousFileChannel inputChannel,  int 
bufferSize, int readBufferSize) {
                super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
//              Objects.requireNonNull(inputChannel, "The file channel must not 
be null");
                init(new AsyncFileChannelReader(inputChannel), 
Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
        }

        
        public BufferedAsyncStream(AsynchronousByteChannel inputChannel) {
                this(inputChannel, DEFAULT_BUFFER_SIZE);
        }
        
        public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int 
bufferSize) {
                this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
        }
        
        public BufferedAsyncStream(AsynchronousByteChannel inputChannel,  int 
bufferSize, int readBufferSize) {
                super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
//              Objects.requireNonNull(inputChannel, "The byte channel must not 
be null");
                init(new AsyncByteChannelReader(inputChannel), 
Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
        }
        
        private void init(ChannelReader inputChannel, int bufferSize, int 
readBufferSize){

                this.readFrom = inputChannel;
                
                if(readBufferSize < 10){
                        throw new IllegalArgumentException("Read buffer must be 
at least 10 bytes");
                }
                
                if(readBufferSize > bufferSize){
                        throw new IllegalArgumentException("ReadBufferSize must 
be smaller than bufferSize");
                }
                
                // The read buffer must not be larger than the internal buffer.
                readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
                this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
                
                this.readCallback = new FileReadCallback<>();
                
                isClosed = false;
                
                readFrom.read(readBuffer);
        }
        
        int readSpaceAvailable(){
                // This can be an int because buf size can never be larger than 
the int
                return (int) (writePos - readPos);
        }
        int writeSpaceAvailable(){
                return buf.length - readSpaceAvailable();
        }
        
        /**
         * Blocks waiting for a write.
         * After leaving the block, it is guaranteed that at least 1 byte is 
readable from the buffer 
         * @throws IOException
         */
        // This is /* synchronized */ to make sure only up to 1 thread is 
parked inside
        void waitForFill() throws IOException {
                while(readSpaceAvailable() < 1 && !eof){
                        try{
                                parkingThread = Thread.currentThread();
                                // wait for 1 second tops. File/web reading 
really should be faster than that
                                LockSupport.parkNanos(this, 1_000_000_000);
                        }finally{
                                parkingThread = null;                   
                        }
        }
    }
 
        
        private int readingFormalities() throws IOException {
                if(!isReading){
                        if(nextException != null){
                                try{
                                        throw nextException;
                                } finally {
                                        nextException = null;
                                }
                        }
                        if(eof){
                                return -1;
                        }
                        readFrom.read(readBuffer);
                }
                
                return readSpaceAvailable();
        }
        
        @Override
        public /* synchronized */ int read() throws IOException {
                if(readFrom == null){
                        return super.read();                    
                }
                
                switch(readingFormalities()){
                        case -1:
                                return -1;
                        case 0:
                                waitForFill();
                                /* No Default */
                }
                
                return buf[(int)(readPos++ % buf.length)] & 0xff;
        }

        @Override
        public /* synchronized */ int read(byte[] b, int off, int len) throws 
IOException {
                if(readFrom == null){
                        return super.read(b, off, len);
                }
                
                if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
                
                switch(readingFormalities()){
                        case -1:
                                return -1;
                        case 0:
//                              waitForFill();
                                /* No Default */
                }
                
                // Note for reviewing: Some optimizations that take some time 
were not made here.
                // E.g.
                // -> If b consumes the whole buff, copy directly from 
ByteBuffer to this buff
                // -> As long as reading buffs are large enough (requires 
benchmarking), 
                //        not do use the buf array at all (late initialization 
too)

        int ByteBufferFrom = (int) (readPos % buf.length);
        int canReadAmount =  Math.min((int) readSpaceAvailable(), len - off);
        int canByteBufferUntil = (ByteBufferFrom + canReadAmount) % buf.length;
        
                if(canByteBufferUntil < ByteBufferFrom){
                // For reviewer: Should it always read once? Or should it read 
twice?
                        System.arraycopy(buf, ByteBufferFrom, b, off, 
buf.length - ByteBufferFrom);
                        int ndStartAt = off + (buf.length - ByteBufferFrom);
                        // This if should never evaluate "true". It's here just 
to make sure and make breakpoints
                        if(off - ndStartAt >= len){
                                readPos = canByteBufferUntil;
                                return off - ndStartAt; 
                        }
                        System.arraycopy(buf, 0, b, ndStartAt, 
canByteBufferUntil);
                        readPos += (buf.length - ByteBufferFrom) + 
canByteBufferUntil;
                        return (buf.length - ByteBufferFrom) + 
canByteBufferUntil;
        } else {
                        System.arraycopy(buf, ByteBufferFrom, b, off, 
canByteBufferUntil - ByteBufferFrom);
                        readPos += canByteBufferUntil - ByteBufferFrom;
                        return canByteBufferUntil - ByteBufferFrom;
        }
                
                
        }

        @Override
        public long skip(long arg0) throws IOException {
                if(readFrom == null){
                        return super.skip(arg0);
                }
                
                // There's no skip in this poc
                // For the real implementation, I'd do (while I didn't skip 
enough):
                // I'd just "jump" the readPos accordingly (while keeping 
readPos <= writePos)
                // Then 
                // reposition (.position(int)) in buffer
                // OR
                // clear the buffer
                // Then
                // If SeekableChannel -> use (sord of) .position(.position()+ 
skipLeft)
                // else -> do not seek.
                
                
                throw new UnsupportedOperationException();
                
        }

        @Override
        public int available() throws IOException {
                if(readFrom == null){
                        return super.available();
                }
                return readSpaceAvailable();
                
        }

        @Override
        public void mark(int arg0) {
                if(readFrom == null){
                        super.mark(arg0);
                }
                
                // This is not hard to do but it requires quite some time!
                // I'll wait first if the poc passes without this method
        }

        @Override
        public void reset() throws IOException {
                if(readFrom == null){
                        super.reset();
                }
                throw new IOException("The mark was lost");
        }

        @Override
        public boolean markSupported() {
                if(readFrom == null){
                        return super.markSupported();
                }
                // false for now, at least.
                return false;
        }

        @Override
        public void close() throws IOException {
                if(readFrom != null){
                        readFrom.close();
                }
                super.close();
        }

}
package org.sample.BufferedNon_BlockIO;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
 * A sample file reading benchmark
 */
@BenchmarkMode(Mode.SingleShotTime)
@Fork(value = 1)
@Warmup(iterations = 0, time = 2, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 2)
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class BufferedNonBlockStreamBenchmark_try1 {

        public static boolean isWindows = 
System.getProperty("os.name").contains("Windows");
        
    @Param({"1gig_file"})
    String file;

//    @Param({"4100", "16400", "131100", "1048600"})
    @Param({"4100", "131100", "1048600"})
    int javaBufSize;
    
    @Param({"100", "1000"})
    int readSize;

//    @Param({"0", "1000", "100000"})
    @Param({"0", "1000"})
    long cpuWork;

    //                                                                          
                This MUST be smaller than javaBufSize
    //                                                                          
                @Param({"4096", "16384", "131072", "1048576"})
    @Param({"BufferedInputStream", "BufferedNonBlockStream|4096", 
"BufferedNonBlockStream|131072", "BufferedNonBlockStream|1048576",
                                                           
"BufferedAsyncStream|4096",    "BufferedAsyncStream|131072",    
"BufferedAsyncStream|1048576"})
    String implType;

    interface ReadOp {
        int read(byte[] buf) throws IOException;
    }

    private ReadOp read;
    private Closeable close;
    private byte[] buf;

    @Setup(Level.Iteration)
    public void setup() throws IOException, InterruptedException {

        clearCache();
        
        String[] type_readSize = implType.split("\\|");

        switch (type_readSize[0]) {
            case "BufferedInputStream":{
                BufferedInputStream in = new BufferedInputStream(
                    new FileInputStream(file), javaBufSize);
                read = in::read;
                close = in::close;
            }
                break;
            case "BufferedNonBlockStream": {
                BufferedNonBlockStream in = new BufferedNonBlockStream(
                                FileChannel.open(new File(file).toPath()), 
javaBufSize, Integer.parseInt(type_readSize[1]));
                read = in::read;
                close = in::close;
            }
            break;
            case "BufferedAsyncStream": {
                BufferedAsyncStream in = new BufferedAsyncStream(
                                AsynchronousFileChannel.open(new 
File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
                read = in::read;
                close = in::close;
            }
            break;
            default:
                throw new IllegalArgumentException(
                    "Invalid parameter 'implType': " + implType);
        }

        buf = new byte[readSize];
    }

    @TearDown(Level.Iteration)
    public void tearDown() throws IOException {
        close.close();
    }

    /**
     * 
     * For linux:
     * 
     * Compile the following C program into clear_cache executable:
     * <pre>{@code
     *
     * #include <stdio.h>
     * #include <string.h>
     * #include <errno.h>
     *
     * #define PROC_FILE "/proc/sys/vm/drop_caches"
     *
     * int main(char *argv[], int argc) {
     *   FILE *f;
     *   f = fopen(PROC_FILE, "w");
     *   if (f) {
     *     fprintf(f, "3\n");
     *     fclose(f);
     *     return 0;
     *   } else {
     *     fprintf(stderr, "Can't write to: %s: %s\n", PROC_FILE, 
strerror(errno));
     *     return 1;
     *   }
     * }
     *
     * }</pre>
     * ... and make it SUID root!
     * 
     * For windows:
     * Use the provided clear_cache.exe
     * (source code and original author:)
     * https://gist.github.com/bitshifter/c87aa396446bbebeab29
     * Then run this program with administrator privileges
     */
    private static void clearCache() throws IOException, InterruptedException {
        // spawn an OS command to clear the FS cache...
        if(isWindows){
                new ProcessBuilder("clear_cache.exe").start().waitFor();
        } else {
                new ProcessBuilder("clear_cache").start().waitFor();            
        
        }
    }

    @Benchmark
    public int testRead() throws IOException {
        int nread = 0;
        int n;
        while ((n = read.read(buf)) >= 0) {
            nread += n;
            Blackhole.consumeCPU(cpuWork);
        }
        return nread;
    }
}

Reply via email to