Attached and sending!
On 24/10/2016 13:48, Pavel Rappo wrote:
Could you please send a new email on this list with the source attached as a
text file?
On 23 Oct 2016, at 19:14, Brunoais <brunoa...@gmail.com> wrote:
Here's my poc/prototype:
http://pastebin.com/WRpYWDJF
I've implemented the bare minimum of the class that follows the same contract
of BufferedReader while signaling all issues I think it may have or has in
comments.
I also wrote some javadoc to help guiding through the class.
I could have used more fields from BufferedReader but the names were so
minimalistic that were confusing me. I intent to change them before sending
this to openJDK.
One of the major problems this has is long overflowing. It is major because it
is hidden, it will be extremely rare and it takes a really long time to
reproduce. There are different ways of dealing with it. From just documenting
to actually making code that works with it.
I built a simple test code for it to have some ideas about performance and
correctness.
http://pastebin.com/eh6LFgwT
This doesn't do a through test if it is actually working correctly but I see no
reason for it not working correctly after fixing the 2 bugs that test found.
I'll also leave here some conclusions about speed and resource consumption I
found.
I made tests with default buffer sizes, 5000B 15_000B and 500_000B. I noticed
that, with my hardware, with the 1 530 000 000B file, I was getting around:
In all buffers and fake work: 10~15s speed improvement ( from 90% HDD speed to
100% HDD speed)
In all buffers and no fake work: 1~2s speed improvement ( from 90% HDD speed to
100% HDD speed)
Changing the buffer size was giving different reading speeds but both were
quite equal in how much they would change when changing the buffer size.
Finally, I could always confirm that I/O was always the slowest thing while
this code was running.
For the ones wondering about the file size; it is both to avoid OS cache and to
make the reading at the main use-case these objects are for (large streams of
bytes).
@Pavel, are you open for discussion now ;)? Need anything else?
On 21/10/2016 19:21, Pavel Rappo wrote:
Just to append to my previous email. BufferedReader wraps any Reader out there.
Not specifically FileReader. While you're talking about the case of effective
reading from a file.
I guess there's one existing possibility to provide exactly what you need (as I
understand it) under this method:
/**
* Opens a file for reading, returning a {@code BufferedReader} to read text
* from the file in an efficient manner...
...
*/
java.nio.file.Files#newBufferedReader(java.nio.file.Path)
It can return _anything_ as long as it is a BufferedReader. We can do it, but it
needs to be investigated not only for your favorite OS but for other OSes as
well. Feel free to prototype this and we can discuss it on the list later.
Thanks,
-Pavel
On 21 Oct 2016, at 18:56, Brunoais <brunoa...@gmail.com> wrote:
Pavel is right.
In reality, I was expecting such BufferedReader to use only a single buffer and
have that Buffer being filled asynchronously, not in a different Thread.
Additionally, I don't have the intention of having a larger buffer than before
unless stated through the API (the constructor).
In my idea, internally, it is supposed to use
java.nio.channels.AsynchronousFileChannel or equivalent.
It does not prevent having two buffers and I do not intent to change
BufferedReader itself. I'd do an BufferedAsyncReader of sorts (any name
suggestion is welcome as I'm an awful namer).
On 21/10/2016 18:38, Roger Riggs wrote:
Hi Pavel,
I think Brunoais asking for a double buffering scheme in which the
implementation of
BufferReader fills (a second buffer) in parallel with the application reading
from the 1st buffer
and managing the swaps and async reads transparently.
It would not change the API but would change the interactions between the
buffered reader
and the underlying stream. It would also increase memory requirements and
processing
by introducing or using a separate thread and the necessary synchronization.
Though I think the formal interface semantics could be maintained, I have doubts
about compatibility and its unintended consequences on existing subclasses,
applications and libraries.
$.02, Roger
On 10/21/16 1:22 PM, Pavel Rappo wrote:
Off the top of my head, I would say it's not possible to change the design of an
_extensible_ type that has been out there for 20 or so years. All these I/O
streams from java.io were designed for simple synchronous use case.
It's not that their design is flawed in some way, it's that they doesn't seem to
suit your needs. Have you considered using
java.nio.channels.AsynchronousFileChannel
in your applications?
-Pavel
On 21 Oct 2016, at 17:08, Brunoais <brunoa...@gmail.com> wrote:
Any feedback on this? I'm really interested in implementing such
BufferedReader/BufferedStreamReader to allow speeding up my applications
without having to think in an asynchronous way or multi-threading while
programming with it.
That's why I'm asking this here.
On 13/10/2016 14:45, Brunoais wrote:
Hi,
I looked at BufferedReader source code for java 9 long with the source code of
the channels/streams used. I noticed that, like in java 7, BufferedReader does
not use an Async API to load data from files, instead, the data loading is all
done synchronously even when the OS allows requesting a file to be read and
getting a warning later when the file is effectively read.
Why Is BufferedReader not async while providing a sync API?
package pocs.java;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
public class BufferedNonBlockStream extends BufferedInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8_192;
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
private static final int MIN_READ_BUFFER_SIZE = 10;
/**
* The maximum size of the read buffer if set automatically.
* Unless explicitly told, there's no need to starve the VM out of memory
*/
private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
/**
* The virtual index of the next position where data will be read.
* Every operation requires executing a mathematical mod ({@code %})
operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*/
protected long readPos;
/**
* The virtual index of the next position where data will be written
* Every operation requires executing a mathematical mod ({@code %})
operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*
* At every time, {@code readPos < writePos %} must be true
*
*/
// Note to reviewers: What to do when long overflows? It will happen but
does a documented warning suffice or... do I need to work with the overflows?
protected long writePos;
ReadableByteChannel inputChannel;
ByteBuffer readBuffer;
boolean isClosed;
boolean eof;
public BufferedNonBlockStream(InputStream inputStream) {
super(inputStream);
}
public BufferedNonBlockStream(ReadableByteChannel inputChannel) {
this(inputChannel, DEFAULT_BUFFER_SIZE);
}
public BufferedNonBlockStream(ReadableByteChannel inputChannel, int
bufferSize) {
this(inputChannel, bufferSize, Math.max(Math.min(bufferSize /
4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
}
public BufferedNonBlockStream(ReadableByteChannel inputChannel, int
bufferSize, int readBufferSize) {
super(null, bufferSize);
Objects.requireNonNull(inputChannel, "The byte channel must not
be null");
this.inputChannel = inputChannel;
if(readBufferSize < 1){
throw new IllegalArgumentException("Read buffer must be
at least 1 byte");
}
if(readBufferSize > bufferSize){
throw new IllegalArgumentException("ReadBufferSize must
be smaller than bufferSize");
}
// The read buffer must not be larger than the internal buffer.
readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
isClosed = false;
}
int readSpaceAvailable(){
// This can be an int because buf size can never be larger than
the int
return (int) (writePos - readPos);
}
int writeSpaceAvailable(){
return buf.length - readSpaceAvailable();
}
void fill() throws IOException {
fill(false);
}
void blockingFill(boolean forced) throws IOException {
fill(forced);
while(readPos == writePos){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// An interrupt may mean more data is available
}
fill(forced);
}
}
synchronized void fill(boolean forced) throws IOException {
isClosed = !inputChannel.isOpen();
eof = inputChannel.read(readBuffer) == -1;
// For the poc, markers are not coded
long freeBufSpace = writeSpaceAvailable();
if( (forced && readBuffer.position() > 0) ||
// It's no use reading a very small quantity from the buffer
// Please review that.
(freeBufSpace > readBuffer.capacity() / 2 && freeBufSpace >=
readBuffer.position())
){
readBuffer.flip();
// Ints are faster and an array can only be defined using an int
int writeBufferFrom = (int) (writePos % buf.length);
int canReadAmount = Math.min((int) freeBufSpace,
readBuffer.remaining());
int canWriteUntil = (writeBufferFrom + canReadAmount) % buf.length;
// This can only be done like this because ReadBufferSize is
smaller than bufferSize
// and overflows are ignored
if(canWriteUntil < writeBufferFrom){
// Read in 2 parts
readBuffer.get(buf, writeBufferFrom, buf.length -
writeBufferFrom);
readBuffer.get(buf, 0, canWriteUntil);
} else {
readBuffer.get(buf, writeBufferFrom, canWriteUntil -
writeBufferFrom);
}
writePos += canReadAmount;
// Reset the buffer for more reading
readBuffer.clear();
}
}
@Override
public int read() throws IOException {
if(inputChannel == null){
return super.read();
}
fill(false);
if(readPos == writePos){
// Get anything that is in the buffer
blockingFill(true);
}
return buf[(int)(readPos++ % buf.length)] & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if(inputChannel == null){
return super.read(b, off, len);
}
// Always fill. Even when rejecting
fill();
if(eof && readSpaceAvailable() == 0){
// Buffer emptied and the stream ended
return -1;
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int bytesRead = 0;
// For a ReadableChannel, assume that it has always read as
much as it could
if(readSpaceAvailable() == 0){
fill(true);
if(readSpaceAvailable() == 0){
return eof ? -1 : 0;
}
}
// Note for reviewing: Some optimizations that take some time
were not made here.
// E.g.
// -> If b consumes the whole buff, copy directly from
readBuffer to this buff
// -> As long as reading buffs are large enough (requires
benchmarking),
// not do use the buf array at all (late initialization
too)
int readBufferFrom = (int) (readPos % buf.length);
int canReadAmount = Math.min((int) readSpaceAvailable(), len - off);
int canReadBufferUntil = (readBufferFrom + canReadAmount) % buf.length;
if(canReadBufferUntil < readBufferFrom){
// For reviewer: Should it always read once? Or should it read
twice?
System.arraycopy(buf, readBufferFrom, b, off,
buf.length - readBufferFrom);
int ndStartAt = off + (buf.length - readBufferFrom);
// This if should never evaluate "true". It's here just
to make sure and make breakpoints
if(off - ndStartAt >= len){
readPos = canReadBufferUntil;
return off - ndStartAt;
}
System.arraycopy(buf, 0, b, ndStartAt,
canReadBufferUntil);
readPos += (buf.length - readBufferFrom) +
canReadBufferUntil;
return (buf.length - readBufferFrom) +
canReadBufferUntil;
} else {
System.arraycopy(buf, readBufferFrom, b, off,
canReadBufferUntil - readBufferFrom);
readPos += canReadBufferUntil - readBufferFrom;
return canReadBufferUntil - readBufferFrom;
}
}
@Override
public long skip(long arg0) throws IOException {
if(inputChannel == null){
return super.skip(arg0);
}
// There's no skip in this poc
// For the real implementation, I'd do (while I didn't skip
enough):
// I'd just "jump" the readPos accordingly (while keeping
readPos <= writePos)
// Then
// reposition (.position(int)) in buffer
// OR
// clear the buffer
// Then
// If SeekableChannel -> use (sord of) .position(.position()+
skipLeft)
// else -> do not seek.
throw new UnsupportedOperationException();
}
@Override
public int available() throws IOException {
if(inputChannel == null){
return super.available();
}
fill();
// For now, just return what I am sure I have
return readSpaceAvailable();
}
@Override
public void mark(int arg0) {
if(inputChannel == null){
super.mark(arg0);
}
// This is not hard to do but it requires quite some time!
// I'll wait first if the poc passes without this method
}
@Override
public void reset() throws IOException {
if(inputChannel == null){
super.reset();
}
throw new IOException("The mark was lost");
}
@Override
public boolean markSupported() {
if(inputChannel == null){
return super.markSupported();
}
// false for now, at least.
return false;
}
@Override
public void close() throws IOException {
if(inputChannel != null){
inputChannel.close();
}
super.close();
}
}
package pocs.java;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.zip.CRC32;
public class Tests {
private static final int PROGRAM_BUFFER = 200;
private static final int READ_BUFFER = 500_000;
private static final boolean EXECUTE_WORK = true;
static void buildTestFile() throws Exception{
BufferedOutputStream outputing = new BufferedOutputStream(new
FileOutputStream("TestFile"), 30000);
for (int i = 0; i < 6_000_000; i++) {
for (int j = 0; j < 255; j++) {
outputing.write(j);
}
}
outputing.close();
}
static void theRead(BufferedInputStream inputing) throws Exception{
byte[] myBuf = new byte[PROGRAM_BUFFER];
int readAmount = 0;
byte expectingByte = 0x00;
int strikes = 0;
int readCount = 0;
while((readAmount = inputing.read(myBuf)) != -1){
readCount++;
for (int i = 0; i < readAmount; i++) {
if(expectingByte == -1){
expectingByte = 0;
}
if(expectingByte != myBuf[i]){
System.out.println("For byte " + (int)
myBuf[i] + " expected " + (int) expectingByte );
if(strikes++ > 50){
return;
}
}
expectingByte += 1;
}
if(EXECUTE_WORK){
// doing work
for (int i = 0; i < 30; i++) {
new CRC32().update(myBuf);
}
}
}
}
static void normalRead() throws Exception {
try(BufferedInputStream inputing = new BufferedInputStream(new
FileInputStream("TestFile"), READ_BUFFER);){
theRead(inputing);
}
}
static void myRead() throws Exception {
try(BufferedInputStream inputing = new BufferedNonBlockStream(
FileChannel.open(new
File("TestFile").toPath()), READ_BUFFER
);
){
theRead(inputing);
}
}
public static void main(String[] args) throws Exception {
// buildTestFile();
long start = System.currentTimeMillis();
// normalRead();
myRead();
System.out.println("Time! " + (System.currentTimeMillis() -
start));
}
}