This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 81f0d42 ARROW-7437: [Java] ReadChannel#readFully does not set writer
index correctly
81f0d42 is described below
commit 81f0d4228e594b10434021c5d9f8187a4e56fc6a
Author: liyafan82 <[email protected]>
AuthorDate: Fri Dec 27 21:28:29 2019 -0500
ARROW-7437: [Java] ReadChannel#readFully does not set writer index correctly
1. The writer index should be incremented by the amount of data actually
read.
2. When EOS is encounterned, the number of bytes read should be incremented
before returning.
Closes #6064 from liyafan82/fly_1219_idx and squashes the following commits:
e268b0d31 <liyafan82> Resolve comments
baab80532 <liyafan82> ReadChannel#readFully does not set writer index
correctly
Authored-by: liyafan82 <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../org/apache/arrow/vector/ipc/ReadChannel.java | 9 +++--
.../arrow/vector/ipc/TestArrowReaderWriter.java | 42 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
index 3c3069e..02061c7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
@@ -53,11 +53,14 @@ public class ReadChannel implements AutoCloseable {
* @throws IOException if nit enough bytes left to read
*/
public int readFully(ByteBuffer buffer) throws IOException {
- LOGGER.debug("Reading buffer with size: {}", buffer.remaining());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Reading buffer with size: {}", buffer.remaining());
+ }
int totalRead = 0;
while (buffer.remaining() != 0) {
int read = in.read(buffer);
- if (read < 0) {
+ if (read == -1) {
+ this.bytesRead += totalRead;
return totalRead;
}
totalRead += read;
@@ -79,7 +82,7 @@ public class ReadChannel implements AutoCloseable {
*/
public int readFully(ArrowBuf buffer, int l) throws IOException {
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
- buffer.writerIndex(n);
+ buffer.writerIndex(buffer.writerIndex() + n);
return n;
}
diff --git
a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index dda402d..4d6da12 100644
---
a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++
b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -29,6 +29,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -525,4 +527,44 @@ public class TestArrowReaderWriter {
batch.close();
vector.close();
}
+
+ @Test
+ public void testChannelReadFully() throws IOException {
+ final ByteBuffer buf =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ buf.putInt(200);
+ buf.rewind();
+
+ try (ReadChannel channel = new ReadChannel(Channels.newChannel(new
ByteArrayInputStream(buf.array())));
+ ArrowBuf arrBuf = allocator.buffer(8)) {
+ arrBuf.setInt(0, 100);
+ arrBuf.writerIndex(4);
+ assertEquals(4, arrBuf.writerIndex());
+
+ int n = channel.readFully(arrBuf, 4);
+ assertEquals(4, n);
+ assertEquals(8, arrBuf.writerIndex());
+
+ assertEquals(100, arrBuf.getInt(0));
+ assertEquals(200, arrBuf.getInt(4));
+ }
+ }
+
+ @Test
+ public void testChannelReadFullyEos() throws IOException {
+ final ByteBuffer buf =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ buf.putInt(10);
+ buf.rewind();
+
+ try (ReadChannel channel = new ReadChannel(Channels.newChannel(new
ByteArrayInputStream(buf.array())));
+ ArrowBuf arrBuf = allocator.buffer(8)) {
+ int n = channel.readFully(arrBuf.nioBuffer(0, 8));
+ assertEquals(4, n);
+
+ // the input has only 4 bytes, so the number of bytes read should be 4
+ assertEquals(4, channel.bytesRead());
+
+ // the first 4 bytes have been read successfully.
+ assertEquals(10, arrBuf.getInt(0));
+ }
+ }
}