Here's my try on going async.
On my tests on my local windows 10 machine (I don't have access to the
linux one without a VM ATM) , now with 1GB file, I noticed:
~2s of speed improvement from BufferedInputStream to
BufferedNonBlockStream when BufferedNonBlockStream is at its most
advantageous state (with CPU work between reads). Otherwise, it has
~0.3s speed improvement; maybe less.
~0.8s of speed improvement from BufferedNonBlockStream to
BufferedAsyncStream when BufferedNonBlockStream/BufferedAsyncStream is
at its most advantageous state. Otherwise, ~0 speed improvement.
I noticed: Until you process as fast as you read, both
BufferedNonBlockStream and BufferedAsyncStream gain advantage towards
BufferedInputStream. From the point as you take longer to process than
to I/O, BufferedNonBlockStream and BufferedAsyncStream tend to keep the
advantage.
If the file is in cache, BufferedInputStream takes ~1.6-1.9s to read the
file, BufferedNonBlockStream takes a steady ~2.05-2.1s to read the file
and BufferedAsyncStream ~1.2s.
BufferedNonBlockStream and BufferedAsyncStream win more when given more
buffer memory than BufferedInputStream but only until a certain limit.
On my hardware, the best speed I got was with 655360B for the new ones.
Any more than that was not producing any visible results. I guess it is
due to the speed data was processing for the test.
On 28/10/2016 09:16, Brunoais wrote:
I'll try going back to a previous version I worked on which used the
java7's AsynchronousFileChannel and work from there. My small research
shows it can also work with AsynchronousFileChannel mostly without
changes.
For now, 1 question:
Is Thread.sleep() a possible way of dealing the block requirements of
read()? Do I need to use LockSupport.park() or something like that?
I'll call back here when it is done.
On 27/10/2016 22:09, David Holmes wrote:
You might try discussing on net-dev rather than core-libs-dev, to get
additional historical info related to the io and nio file APIs.
David
On 28/10/2016 5:08 AM, Brunoais wrote:
You are right. Even in windows it does not set the flags for async
reads. It seems like it is windows itself that does the decision to
buffer the contents based on its own heuristics.
But... Why? Why won't it be? Why is there no API for it? How am I
getting 100% HDD use and faster times when I fake work to delay getting
more data and I only have a fluctuating 60-90% (always going up and
down) when I use an InputStream?
Is it related to how both classes cache and how frequently and how much
each one asks for data?
I really would prefer not having to read the source code because it
takes a real long time T.T.
I end up reinstating... And wondering...
Why doesn't java provide a single-threaded non-block API for file reads
for all OS that support it? I simply cannot find that information no
matter how much I search on google, bing, duck duck go... Can any of
you
point me to whomever knows?
On 27/10/2016 14:11, Vitaly Davidovich wrote:
I don't know about Windows specifically, but generally file systems
across major OS's will implement readahead in their IO scheduler when
they detect sequential scans.
On Linux, you can also strace your test to confirm which syscalls are
emitted (you should be seeing plain read()'s there, with
FileInputStream and FileChannel).
On Thu, Oct 27, 2016 at 9:06 AM, Brunoais <brunoa...@gmail.com
<mailto:brunoa...@gmail.com>> wrote:
Thanks for the heads up.
I'll try that later. These tests are still useful then. Meanwhile,
I'll end up also checking how FileChannel queries the OS on
windows. I'm getting 100% HDD reads... Could it be that the OS
reads the file ahead on its own?... Anyway, I'll look into it.
Thanks for the heads up.
On 27/10/2016 13:53, Vitaly Davidovich wrote:
On Thu, Oct 27, 2016 at 8:34 AM, Brunoais <brunoa...@gmail.com
<mailto:brunoa...@gmail.com>> wrote:
Oh... I see. In that case, it means something is terribly
wrong. It can be my initial tests, though.
I'm testing on both linux and windows and I'm getting
performance gains from using the FileChannel compared to
using FileInputStream... The tests also make sense based on
my predictions O_O...
FileInputStream requires copying native buffers holding the read
data to the java byte[]. If you're using direct ByteBuffer for
FileChannel, that whole memcpy is skipped. Try comparing
FileChannel with HeapByteBuffer instead.
On 27/10/2016 11:47, Vitaly Davidovich wrote:
On Thursday, October 27, 2016, Brunoais <brunoa...@gmail.com
<mailto:brunoa...@gmail.com>> wrote:
Did you read the C code?
I looked at the Linux code in the JDK.
Have you got any idea how many functions Windows or
Linux (nearly all flavors) have for the read operation
towards a file?
I do.
I have already done that homework myself. I may not have
read JVM's source code but I know well that there's
functions on both Windows and Linux that provide such
interface I mentioned although they require a slightly
different treatment (and different constants).
You should read the JDK (native) source code instead of
guessing/assuming. On Linux, it doesn't use aio facilities
for files. The kernel io scheduler may issue readahead
behind the scenes, but there's no nonblocking file io that's
at the heart of your premise.
On 27/10/2016 00:06, Vitaly Davidovich wrote:
On Wednesday, October 26, 2016, Brunoais
<brunoa...@gmail.com <mailto:brunoa...@gmail.com>>
wrote:
It is actually based on the premise that:
1. The first call to
ReadableByteChannel.read(ByteBuffer) sets the OS
buffer size to fill in as the same size as
ByteBuffer.
Why do you say that? AFAICT, it issues a read
syscall and that will block if the data isn't in
page cache.
2. The consecutive calls to
ReadableByteChannel.read(ByteBuffer)
orders
the JVM to order the OS to execute memcpy()
to copy from its memory
to the shared memory created at ByteBuffer
instantiation (in
java 8)
using Unsafe and then for the JVM to update
the ByteBuffer fields.
I think subsequent reads just invoke the same read
syscall, passing the current file offset maintained
by the file channel instance.
3. The call will not block waiting for I/O and
it won't take longer
than the JNI interface if no new data exists.
However, it will
block
waiting for the OS to execute memcpy() to the
shared memory.
So why do you think it won't block?
Is my premise wrong?
If I read correctly, if I don't use a
DirectBuffer, there would be
even another intermediate buffer to copy data to
before giving it
to the "user" which would be useless.
If you use a HeapByteBuffer, then there's an extra
copy from the native buffer to the Java buffer.
On 26/10/2016 11:57, Pavel Rappo wrote:
I believe I see where you coming from.
Please correct me if
I'm wrong.
Your implementation is based on the premise
that a call to
ReadableByteChannel.read()
_initiates_ the operation and returns
immediately. The OS then
continues to fill
the buffer while there's a free space in the
buffer and the
channel hasn't encountered EOF.
Is that right?
On 25 Oct 2016, at 22:16, Brunoais
<brunoa...@gmail.com>
wrote:
Thank you for your time. I'll try to
explain it. I hope I
can clear it up.
First of it, I made a meaning mistake
between asynchronous
and non-blocking. This implementation
uses a non-blocking
algorithm internally while providing a
blocking-like
algorithm on the surface. It is
single-threaded and not
multi-threaded where one thread fetches
data and blocks
waiting and the other accumulates it and
provides to
whichever wants it.
Second of it, I had made a mistake of
going after
BufferedReader instead of going after
BufferedInputStream.
If you want me to go after
BufferedReader it's ok but I
only thought that going after
BufferedInputStream would be
more generically useful than
BufferedReaderwhen I started
the poc.
On to my code:
Short answers:
• The sleep(int) exists because
I don't know how
to wait until more data exists in the
buffer which is part
of read()'s contract.
• The ByteBuffer gives a buffer
that is filled by
the OS (what I believe Channels do)
instead of getting
data only by demand (what I
believe Streams do).
Full answers:
The blockingFill(boolean) method is a
method for a busy
wait for a fill which is used
exclusively by the read()
method. All other methods use the
version that does not
sleep (fill(boolean)).
blockingFill(boolean)'s existance like that is only
because the read() method must not
return unless either:
• The stream ended.
• The next byte is ready for
reading.
Additionally, statistically, that while
loop will rarely
evaluate to true as reads are in chunks
so readPos will be
behind writePos most of the time.
I have no idea if an interrupt will ever
happen, to be
honest. The main reasons why I'm using a
sleep is because
I didn't want a hog onto the CPU in a
full thread usage
busy wait and because I didn't find any
way of doing a
thread sleep in order to wake up later
when the buffer
managed by native code has more data.
The Non-blocking part is managed by the
buffer the OS
keeps filling most if not all the time.
That buffer is the
field
ByteBuffer readBuffer
That's the gaining part against the
plain old Buffered
classes.
Did that make sense to you? Feel free to
ask anything else
you need.
On 25/10/2016 20:52, Pavel Rappo wrote:
I've skimmed through the code and
I'm not sure I can
see any asynchronicity
(you were pointing at the lack of it
in BufferedReader).
And the mechanics of this is very
puzzling to me, to
be honest:
void blockingFill(boolean
forced) throws
IOException {
fill(forced);
while (readPos ==
writePos) {
try {
Thread.sleep(100);
} catch
(InterruptedException e) {
// An interrupt may mean more data is
available
}
fill(forced);
}
}
I thought you were suggesting that
we should utilize
the tools which OS provides
more efficiently. Instead we have
something that looks
very similarly to a
"busy loop" and... also who and when
is supposed to
interrupt Thread.sleep()?
Sorry, I'm not following. Could you
please explain how
this is supposed to work?
On 24 Oct 2016, at 15:59,
Brunoais
<brunoa...@gmail.com>
wrote:
Attached and sending!
On 24/10/2016 13:48, Pavel Rappo
wrote:
Could you please send a new
email on this list
with the source attached
as a
text file?
On 23 Oct 2016, at
19:14, Brunoais
<brunoa...@gmail.com>
wrote:
Here's my poc/prototype:
http://pastebin.com/WRpYWDJF
I've implemented the
bare minimum of the
class that follows the same contract of
BufferedReader while signaling all issues
I think it may have or
has in comments.
I also wrote some
javadoc to help guiding
through the class.
I could have used more
fields from
BufferedReader but the names were so
minimalistic that were confusing me. I
intent to change them before sending this
to openJDK.
One of the major
problems this has is long
overflowing. It is major because it is
hidden, it will be extremely rare and it
takes a really long time to reproduce.
There are different ways of dealing with
it. From just
documenting to actually
making code that works with it.
I built a simple test
code for it to have
some ideas about
performance and correctness.
http://pastebin.com/eh6LFgwT
This doesn't do a
through test if it is
actually working correctly but I see no
reason for it not working correctly after
fixing the 2 bugs that test found.
I'll also leave here
some conclusions
about speed and resource consumption I found.
I made tests with
default buffer sizes,
5000B 15_000B and 500_000B. I noticed
that, with my hardware, with the 1 530 000
000B file, I was getting
around:
In all buffers and fake
work: 10~15s speed
improvement ( from 90% HDD speed to 100%
HDD speed)
In all buffers and no
fake work: 1~2s
speed improvement ( from 90% HDD speed to
100% HDD speed)
Changing the buffer size was giving
different reading speeds but both were
quite equal in how much they would change
when changing the buffer
size.
Finally, I could always confirm that I/O
was always the slowest
thing while this
code was running.
For the ones wondering
about the file
size; it is both to avoid OS cache and to
make the reading at the
main use-case
these objects are for (large streams of
bytes).
@Pavel, are you open for discussion now
;)? Need anything else?
On 21/10/2016 19:21,
Pavel Rappo wrote:
Just to append to my previous email.
BufferedReader wraps any Reader out there.
Not specifically FileReader. While
you're talking about the case of effective
reading from a file.
I guess there's one existing
possibility to provide exactly what
you need (as I
understand it) under this method:
/**
* Opens a file for reading,
returning a {@code BufferedReader} to
read text
* from the file in an efficient
manner...
...
*/
java.nio.file.Files#newBufferedReader(java.nio.file.Path)
It can return _anything_ as long as it
is a BufferedReader. We can do it, but it
needs to be investigated not only for
your favorite OS but for other OSes as
well. Feel free to prototype this and
we can discuss it on the list later.
Thanks,
-Pavel
On 21 Oct 2016, at 18:56, Brunoais
<brunoa...@gmail.com>
wrote:
Pavel is right.
In reality, I was expecting such
BufferedReader to use only a
single buffer and have that Buffer
being filled asynchronously, not
in a different Thread.
Additionally, I don't have the
intention of having a larger
buffer than before unless stated
through the API (the constructor).
In my idea, internally, it is
supposed to use
java.nio.channels.AsynchronousFileChannel
or equivalent.
It does not prevent having two
buffers and I do not intent to
change BufferedReader itself. I'd
do an BufferedAsyncReader of sorts
(any name suggestion is welcome as
I'm an awful namer).
On 21/10/2016 18:38, Roger Riggs
wrote:
Hi Pavel,
I think Brunoais asking for a
double buffering scheme in
which the implementation of
BufferReader fills (a second
buffer) in parallel with the
application reading from the
1st buffer
and managing the swaps and
async reads transparently.
It would not change the API
but would change the
interactions between the
buffered reader
and the underlying stream. It
would also increase memory
requirements and processing
by introducing or using a
separate thread and the
necessary synchronization.
Though I think the formal
interface semantics could be
maintained, I have doubts
about compatibility and its
unintended consequences on
existing subclasses,
applications and libraries.
$.02, Roger
On 10/21/16 1:22 PM, Pavel
Rappo wrote:
Off the top of my head, I
would say it's not
possible to change the
design of an
_extensible_ type that has
been out there for 20 or
so years. All these I/O
streams from java.io <http://java.io>
<http://java.io> were
designed for simple
synchronous use case.
It's not that their design
is flawed in some way,
it's that they doesn't seem to
suit your needs. Have you
considered using
java.nio.channels.AsynchronousFileChannel
in your applications?
-Pavel
On 21 Oct 2016, at
17:08, Brunoais
<brunoa...@gmail.com>
wrote:
Any feedback on this?
I'm really interested
in implementing such
BufferedReader/BufferedStreamReader
to allow speeding up
my applications
without having to
think in an
asynchronous way or
multi-threading while
programming with it.
That's why I'm asking
this here.
On 13/10/2016 14:45,
Brunoais wrote:
Hi,
I looked at
BufferedReader
source code for
java 9 long with
the source code of
the
channels/streams
used. I noticed
that, like in java
7, BufferedReader
does not use an
Async API to load
data from files,
instead, the data
loading is all
done synchronously
even when the OS
allows requesting
a file to be read
and getting a
warning later when
the file is
effectively read.
Why Is
BufferedReader not
async while
providing a sync API?
<BufferedNonBlockStream.java><Tests.java>
-- Sent from my phone
-- Sent from my phone
package org.sample.BufferedNon_BlockIO;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.locks.LockSupport;
public class BufferedAsyncStream extends BufferedInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8_192;
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
private static final int MIN_READ_BUFFER_SIZE = 10;
/**
* The maximum size of the read buffer if set automatically.
* Unless explicitly told, there's no need to starve the VM out of memory
*/
private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
/**
* The virtual index of the next position where data will be read.
* Every operation requires executing a mathematical mod ({@code %})
operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*/
protected long readPos;
/**
* The virtual index of the next position where data will be written
* Every operation requires executing a mathematical mod ({@code %})
operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*
* At every time, {@code readPos <= writePos %} must be true
*
*/
// Note to reviewers: What to do when long overflows? It will happen but
does a documented warning suffice or... do I need to work with the overflows?
protected long writePos;
ChannelReader readFrom;
ByteBuffer readBuffer;
FileReadCallback<Object> readCallback;
IOException nextException;
transient boolean isReading;
Thread parkingThread;
boolean isClosed;
boolean eof;
interface ChannelReader extends Closeable{
void read(ByteBuffer dst);
}
class AsyncFileChannelReader implements ChannelReader,
CompletionHandler<Integer, ByteBuffer>{
AsynchronousFileChannel inChannel;
long currentPosition;
AsyncFileChannelReader(AsynchronousFileChannel inChannel) {
this.inChannel = inChannel;
currentPosition = 0;
}
@Override
public void read(ByteBuffer dst) {
isReading = true;
inChannel.read(dst, currentPosition, dst, this);
}
@Override
public void close() throws IOException{
inChannel.close();
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0){
currentPosition += result;
}
readCallback.completed(result, attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
readCallback.failed(exc, attachment);
}
}
class AsyncByteChannelReader implements ChannelReader{
AsynchronousByteChannel inChannel;
AsyncByteChannelReader(AsynchronousByteChannel inChannel) {
this.inChannel = inChannel;
}
@Override
public void read(ByteBuffer dst) {
isReading = true;
inChannel.read(dst, dst, readCallback);
}
@Override
public void close() throws IOException {
inChannel.close();
}
}
class FileReadCallback<A> implements CompletionHandler<Integer, A>{
@Override
public void completed(Integer resultI, A attachment) {
// unbox
int result = resultI;
if(result == -1){
eof = true;
}
long freeBufSpace = writeSpaceAvailable();
if(freeBufSpace < readBuffer.position()){
// No enough space in buf to copy from
readBuffer.
// Just send it back to fill even more
readFrom.read(readBuffer);
return;
}
if(readBuffer.position() > readBuffer.capacity() / 2 ||
(eof && readBuffer.position() > 0)){
readBuffer.flip();
// Ints are faster and an array can only be
defined using an int
int writeBufferFrom = (int) (writePos %
buf.length);
int canReadAmount = Math.min((int) freeBufSpace,
readBuffer.remaining());
int canWriteUntil = (writeBufferFrom + canReadAmount) %
buf.length;
// This can only be done like this because ReadBufferSize
is smaller than bufferSize
// and overflows are ignored
if(canWriteUntil < writeBufferFrom){
// Read in 2 parts
readBuffer.get(buf, writeBufferFrom, buf.length
- writeBufferFrom);
readBuffer.get(buf, 0, canWriteUntil);
} else {
readBuffer.get(buf, writeBufferFrom,
canWriteUntil - writeBufferFrom);
}
writePos += canReadAmount;
// Reset the buffer for more reading
readBuffer.clear();
LockSupport.unpark(parkingThread);
}
if(eof){
isReading = false;
return;
}
readFrom.read(readBuffer);
}
@Override
public void failed(Throwable exc, A attachment) {
if(exc instanceof IOException){
nextException = (IOException) exc;
} else {
nextException = new IOException("Failed to read
more data to the buffer", exc);
}
isReading = false;
}
}
public BufferedAsyncStream(InputStream inputStream) {
super(inputStream);
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel) {
this(inputChannel, DEFAULT_BUFFER_SIZE);
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int
bufferSize) {
this(inputChannel, bufferSize, Math.max(Math.min(bufferSize /
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int
bufferSize, int readBufferSize) {
super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
// Objects.requireNonNull(inputChannel, "The file channel must not
be null");
init(new AsyncFileChannelReader(inputChannel),
Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel) {
this(inputChannel, DEFAULT_BUFFER_SIZE);
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int
bufferSize) {
this(inputChannel, bufferSize, Math.max(Math.min(bufferSize /
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int
bufferSize, int readBufferSize) {
super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
// Objects.requireNonNull(inputChannel, "The byte channel must not
be null");
init(new AsyncByteChannelReader(inputChannel),
Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
}
private void init(ChannelReader inputChannel, int bufferSize, int
readBufferSize){
this.readFrom = inputChannel;
if(readBufferSize < 10){
throw new IllegalArgumentException("Read buffer must be
at least 10 bytes");
}
if(readBufferSize > bufferSize){
throw new IllegalArgumentException("ReadBufferSize must
be smaller than bufferSize");
}
// The read buffer must not be larger than the internal buffer.
readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
this.readCallback = new FileReadCallback<>();
isClosed = false;
readFrom.read(readBuffer);
}
int readSpaceAvailable(){
// This can be an int because buf size can never be larger than
the int
return (int) (writePos - readPos);
}
int writeSpaceAvailable(){
return buf.length - readSpaceAvailable();
}
/**
* Blocks waiting for a write.
* After leaving the block, it is guaranteed that at least 1 byte is
readable from the buffer
* @throws IOException
*/
// This is /* synchronized */ to make sure only up to 1 thread is
parked inside
void waitForFill() throws IOException {
while(readSpaceAvailable() < 1 && !eof){
try{
parkingThread = Thread.currentThread();
// wait for 1 second tops. File/web reading
really should be faster than that
LockSupport.parkNanos(this, 1_000_000_000);
}finally{
parkingThread = null;
}
}
}
private int readingFormalities() throws IOException {
if(!isReading){
if(nextException != null){
try{
throw nextException;
} finally {
nextException = null;
}
}
if(eof){
return -1;
}
readFrom.read(readBuffer);
}
return readSpaceAvailable();
}
@Override
public /* synchronized */ int read() throws IOException {
if(readFrom == null){
return super.read();
}
switch(readingFormalities()){
case -1:
return -1;
case 0:
waitForFill();
/* No Default */
}
return buf[(int)(readPos++ % buf.length)] & 0xff;
}
@Override
public /* synchronized */ int read(byte[] b, int off, int len) throws
IOException {
if(readFrom == null){
return super.read(b, off, len);
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
switch(readingFormalities()){
case -1:
return -1;
case 0:
// waitForFill();
/* No Default */
}
// Note for reviewing: Some optimizations that take some time
were not made here.
// E.g.
// -> If b consumes the whole buff, copy directly from
ByteBuffer to this buff
// -> As long as reading buffs are large enough (requires
benchmarking),
// not do use the buf array at all (late initialization
too)
int ByteBufferFrom = (int) (readPos % buf.length);
int canReadAmount = Math.min((int) readSpaceAvailable(), len - off);
int canByteBufferUntil = (ByteBufferFrom + canReadAmount) % buf.length;
if(canByteBufferUntil < ByteBufferFrom){
// For reviewer: Should it always read once? Or should it read
twice?
System.arraycopy(buf, ByteBufferFrom, b, off,
buf.length - ByteBufferFrom);
int ndStartAt = off + (buf.length - ByteBufferFrom);
// This if should never evaluate "true". It's here just
to make sure and make breakpoints
if(off - ndStartAt >= len){
readPos = canByteBufferUntil;
return off - ndStartAt;
}
System.arraycopy(buf, 0, b, ndStartAt,
canByteBufferUntil);
readPos += (buf.length - ByteBufferFrom) +
canByteBufferUntil;
return (buf.length - ByteBufferFrom) +
canByteBufferUntil;
} else {
System.arraycopy(buf, ByteBufferFrom, b, off,
canByteBufferUntil - ByteBufferFrom);
readPos += canByteBufferUntil - ByteBufferFrom;
return canByteBufferUntil - ByteBufferFrom;
}
}
@Override
public long skip(long arg0) throws IOException {
if(readFrom == null){
return super.skip(arg0);
}
// There's no skip in this poc
// For the real implementation, I'd do (while I didn't skip
enough):
// I'd just "jump" the readPos accordingly (while keeping
readPos <= writePos)
// Then
// reposition (.position(int)) in buffer
// OR
// clear the buffer
// Then
// If SeekableChannel -> use (sord of) .position(.position()+
skipLeft)
// else -> do not seek.
throw new UnsupportedOperationException();
}
@Override
public int available() throws IOException {
if(readFrom == null){
return super.available();
}
return readSpaceAvailable();
}
@Override
public void mark(int arg0) {
if(readFrom == null){
super.mark(arg0);
}
// This is not hard to do but it requires quite some time!
// I'll wait first if the poc passes without this method
}
@Override
public void reset() throws IOException {
if(readFrom == null){
super.reset();
}
throw new IOException("The mark was lost");
}
@Override
public boolean markSupported() {
if(readFrom == null){
return super.markSupported();
}
// false for now, at least.
return false;
}
@Override
public void close() throws IOException {
if(readFrom != null){
readFrom.close();
}
super.close();
}
}
package org.sample.BufferedNon_BlockIO;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
/**
* A sample file reading benchmark
*/
@BenchmarkMode(Mode.SingleShotTime)
@Fork(value = 1)
@Warmup(iterations = 0, time = 2, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 2)
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class BufferedNonBlockStreamBenchmark_try1 {
public static boolean isWindows =
System.getProperty("os.name").contains("Windows");
@Param({"1gig_file"})
String file;
// @Param({"4100", "16400", "131100", "1048600"})
@Param({"4100", "131100", "1048600"})
int javaBufSize;
@Param({"100", "1000"})
int readSize;
// @Param({"0", "1000", "100000"})
@Param({"0", "1000"})
long cpuWork;
//
This MUST be smaller than javaBufSize
//
@Param({"4096", "16384", "131072", "1048576"})
@Param({"BufferedInputStream", "BufferedNonBlockStream|4096",
"BufferedNonBlockStream|131072", "BufferedNonBlockStream|1048576",
"BufferedAsyncStream|4096", "BufferedAsyncStream|131072",
"BufferedAsyncStream|1048576"})
String implType;
interface ReadOp {
int read(byte[] buf) throws IOException;
}
private ReadOp read;
private Closeable close;
private byte[] buf;
@Setup(Level.Iteration)
public void setup() throws IOException, InterruptedException {
clearCache();
String[] type_readSize = implType.split("\\|");
switch (type_readSize[0]) {
case "BufferedInputStream":{
BufferedInputStream in = new BufferedInputStream(
new FileInputStream(file), javaBufSize);
read = in::read;
close = in::close;
}
break;
case "BufferedNonBlockStream": {
BufferedNonBlockStream in = new BufferedNonBlockStream(
FileChannel.open(new File(file).toPath()),
javaBufSize, Integer.parseInt(type_readSize[1]));
read = in::read;
close = in::close;
}
break;
case "BufferedAsyncStream": {
BufferedAsyncStream in = new BufferedAsyncStream(
AsynchronousFileChannel.open(new
File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
read = in::read;
close = in::close;
}
break;
default:
throw new IllegalArgumentException(
"Invalid parameter 'implType': " + implType);
}
buf = new byte[readSize];
}
@TearDown(Level.Iteration)
public void tearDown() throws IOException {
close.close();
}
/**
*
* For linux:
*
* Compile the following C program into clear_cache executable:
* <pre>{@code
*
* #include <stdio.h>
* #include <string.h>
* #include <errno.h>
*
* #define PROC_FILE "/proc/sys/vm/drop_caches"
*
* int main(char *argv[], int argc) {
* FILE *f;
* f = fopen(PROC_FILE, "w");
* if (f) {
* fprintf(f, "3\n");
* fclose(f);
* return 0;
* } else {
* fprintf(stderr, "Can't write to: %s: %s\n", PROC_FILE,
strerror(errno));
* return 1;
* }
* }
*
* }</pre>
* ... and make it SUID root!
*
* For windows:
* Use the provided clear_cache.exe
* (source code and original author:)
* https://gist.github.com/bitshifter/c87aa396446bbebeab29
* Then run this program with administrator privileges
*/
private static void clearCache() throws IOException, InterruptedException {
// spawn an OS command to clear the FS cache...
if(isWindows){
new ProcessBuilder("clear_cache.exe").start().waitFor();
} else {
new ProcessBuilder("clear_cache").start().waitFor();
}
}
@Benchmark
public int testRead() throws IOException {
int nread = 0;
int n;
while ((n = read.read(buf)) >= 0) {
nread += n;
Blackhole.consumeCPU(cpuWork);
}
return nread;
}
}