sunchao commented on code in PR #960:
URL: https://github.com/apache/parquet-mr/pull/960#discussion_r932674870
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -88,6 +136,21 @@ public long skip(long n) {
return bytesToSkip;
}
+ @Override
+ public void skipFully(long n) throws IOException {
+ if (n < 0 || n > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException();
Review Comment:
nit: can we add some useful message in the exception, something like
"Invalid input for skipFully: {}"
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -38,6 +39,34 @@ class SingleBufferInputStream extends ByteBufferInputStream {
// duplicate the buffer because its state will be modified
this.buffer = buffer.duplicate();
this.startPosition = buffer.position();
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ }
+
+ SingleBufferInputStream(ByteBuffer buffer, int start, int length) {
+ // duplicate the buffer because its state will be modified
+ this.buffer = buffer.duplicate();
+ this.startPosition = start;
+ this.buffer.position(start);
+ this.buffer.limit(start + length);
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ }
+
+ SingleBufferInputStream(byte[] inBuf) {
+ this.buffer = ByteBuffer.wrap(inBuf);
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ this.startPosition = 0;
+ }
+
+ SingleBufferInputStream(byte[] inBuf, int start, int length) {
+ this.buffer = ByteBuffer.wrap(inBuf, start, length);
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ // This seems to be consistent with HeapByteBuffer.wrap(), which leaves
+ // the internal "offset" at zero and sets the starting position at start.
+ this.startPosition = 0;
+ }
+
+ SingleBufferInputStream(List<ByteBuffer> inBufs) {
Review Comment:
hmm why we need this constructor?
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -70,9 +105,22 @@ public int read(byte[] bytes, int offset, int length)
throws IOException {
return bytesToRead;
}
-
+
+ @Override
+ public void readFully(byte[] bytes, int offset, int length) throws
IOException {
+ try {
+ buffer.get(bytes, offset, length);
+ } catch (BufferUnderflowException|IndexOutOfBoundsException e) {
Review Comment:
nit: space before & after `|`
##########
parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java:
##########
@@ -379,4 +427,120 @@ public void remove() {
second.remove();
}
}
+
+ @Override
+ public byte readByte() throws IOException {
+ return (byte)readUnsignedByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
Review Comment:
this looks the same as `read`
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -88,6 +136,21 @@ public long skip(long n) {
return bytesToSkip;
}
+ @Override
+ public void skipFully(long n) throws IOException {
+ if (n < 0 || n > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException();
+ }
+
+ try {
+ buffer.position(buffer.position() + (int)n);
+ } catch (IllegalArgumentException e) {
Review Comment:
instead of try and catch, can we check if the new position is greater than
the `buffer.limit` and throw EOF if so?
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -174,4 +254,64 @@ public boolean markSupported() {
public int available() {
return buffer.remaining();
}
+
+ @Override
+ public byte readByte() throws IOException {
+ try {
+ return buffer.get();
+ } catch (BufferUnderflowException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ try {
+ return buffer.get() & 0xFF;
+ } catch (BufferUnderflowException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ try {
+ return buffer.getShort();
+ } catch (BufferUnderflowException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ try {
+ return buffer.getShort() & 0xFFFF;
+ } catch (BufferUnderflowException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
+ /*
+ Use ByteBuffer.getInt(), which takes advantage of platform intrinsics
+ */
+ @Override
+ public int readInt() throws IOException {
+ try {
+ return buffer.getInt();
+ } catch (BufferUnderflowException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
+ /*
+ Use ByteBuffer.getLonmg(), which takes advantage of platform intrinsics
Review Comment:
nit: typo
##########
parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java:
##########
@@ -89,6 +91,15 @@ public long skip(long n) {
return bytesSkipped;
}
+ @Override
+ public void skipFully(long n) throws IOException {
+ if (current == null || n > length) {
+ throw new EOFException();
Review Comment:
ditto: some info in exception, better retain the original message: `Not
enough bytes to skip: " + skipped + " < " + n)`
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -38,6 +39,34 @@ class SingleBufferInputStream extends ByteBufferInputStream {
// duplicate the buffer because its state will be modified
this.buffer = buffer.duplicate();
this.startPosition = buffer.position();
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ }
+
+ SingleBufferInputStream(ByteBuffer buffer, int start, int length) {
+ // duplicate the buffer because its state will be modified
+ this.buffer = buffer.duplicate();
+ this.startPosition = start;
+ this.buffer.position(start);
+ this.buffer.limit(start + length);
+ this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
+ }
+
+ SingleBufferInputStream(byte[] inBuf) {
Review Comment:
it seems these 3 constructors are not used - can we at least add some tests
to cover them?
##########
parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java:
##########
@@ -379,4 +427,120 @@ public void remove() {
second.remove();
}
}
+
+ @Override
+ public byte readByte() throws IOException {
+ return (byte)readUnsignedByte();
Review Comment:
nit: can we add a space after the cast `(byte) readUnsignedByte()`? for all
the other places too
##########
parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java:
##########
@@ -70,9 +105,22 @@ public int read(byte[] bytes, int offset, int length)
throws IOException {
return bytesToRead;
}
-
+
+ @Override
+ public void readFully(byte[] bytes, int offset, int length) throws
IOException {
+ try {
+ buffer.get(bytes, offset, length);
+ } catch (BufferUnderflowException|IndexOutOfBoundsException e) {
+ throw new EOFException(e.getMessage());
+ }
+ }
+
@Override
public long skip(long n) {
+ if (n < 0) {
+ throw new IllegalArgumentException();
Review Comment:
ditto: add some info to the exception
##########
parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java:
##########
@@ -238,8 +257,31 @@ public int read(byte[] bytes, int off, int len) {
}
@Override
- public int read(byte[] bytes) {
- return read(bytes, 0, bytes.length);
+ public void readFully(byte[] bytes, int off, int len) throws IOException {
Review Comment:
Why can't we just track the remaining bytes of the stream on the client side
and check before reading any bytes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]