This is an automated email from the ASF dual-hosted git repository.
liyafan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ae4eb60 ARROW-9221: [Java] account for big-endian buffers in
ArrowBuf.setBytes
ae4eb60 is described below
commit ae4eb6089316c68d202ced3602b6de1ad77d4878
Author: David Li <[email protected]>
AuthorDate: Fri Jul 3 10:53:37 2020 +0800
ARROW-9221: [Java] account for big-endian buffers in ArrowBuf.setBytes
`ArrowBuf.setBytes` has an override that uses a 8-byte-at-a-time copy loop
if the byte buffer does not provide an array and is not direct. Unfortunately,
this means it'll mangle data when the byte buffer is big-endian, as it then
writes the data into the little-endian ArrowBuf. This fixes it by setting the
byte order before copying, and then restoring it.
Closes #7543 from lidavidm/arrow-9221
Authored-by: David Li <[email protected]>
Signed-off-by: liyafan82 <[email protected]>
---
.../arrow/flight/TestApplicationMetadata.java | 54 ++++++++++++++++++++++
.../java/org/apache/arrow/memory/ArrowBuf.java | 34 ++++++++------
.../java/org/apache/arrow/memory/TestArrowBuf.java | 24 +++++++++-
3 files changed, 97 insertions(+), 15 deletions(-)
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
index 099b158..c7b3321 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
@@ -196,6 +196,36 @@ public class TestApplicationMetadata {
});
}
+ /**
+ * ARROW-9221: Flight copies metadata from the byte buffer of a Protobuf
ByteString,
+ * which is in big-endian by default, thus mangling metadata.
+ */
+ @Test
+ public void testMetadataEndianness() throws Exception {
+ try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ final BufferAllocator serverAllocator =
allocator.newChildAllocator("flight-server", 0, Long.MAX_VALUE);
+ final FlightServer server = FlightTestUtil.getStartedServer(
+ (location) -> FlightServer
+ .builder(serverAllocator, location, new
EndianFlightProducer(serverAllocator))
+ .build());
+ final FlightClient client = FlightClient.builder(allocator,
server.getLocation()).build()) {
+ final Schema schema = new Schema(Collections.emptyList());
+ final FlightDescriptor descriptor = FlightDescriptor.command(new
byte[0]);
+ try (final SyncPutListener reader = new SyncPutListener();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ final FlightClient.ClientStreamListener writer =
client.startPut(descriptor, root, reader);
+ writer.completed();
+ try (final PutResult metadata = reader.read()) {
+ Assert.assertEquals(16,
metadata.getApplicationMetadata().readableBytes());
+ byte[] bytes = new byte[16];
+ metadata.getApplicationMetadata().readBytes(bytes);
+ Assert.assertArrayEquals(EndianFlightProducer.EXPECTED_BYTES, bytes);
+ }
+ writer.getResult();
+ }
+ }
+ }
+
private void test(BiConsumer<BufferAllocator, FlightClient> fun) {
try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
final FlightServer s =
@@ -272,4 +302,28 @@ public class TestApplicationMetadata {
};
}
}
+
+ private static class EndianFlightProducer extends NoOpFlightProducer {
+ static final byte[] EXPECTED_BYTES = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15};
+ private final BufferAllocator allocator;
+
+ private EndianFlightProducer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public Runnable acceptPut(CallContext context, FlightStream flightStream,
StreamListener<PutResult> ackStream) {
+ return () -> {
+ while (flightStream.next()) {
+ // Ignore any data
+ }
+
+ try (final ArrowBuf buf = allocator.buffer(16)) {
+ buf.writeBytes(EXPECTED_BYTES);
+ ackStream.onNext(PutResult.metadata(buf));
+ }
+ ackStream.onCompleted();
+ };
+ }
+ }
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
index 92a14c2..e81a92f 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
@@ -845,24 +845,30 @@ public final class ArrowBuf implements AutoCloseable {
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else {
- // copy word at a time
- while (length - 128 >= LONG_SIZE) {
- for (int x = 0; x < 16; x++) {
+ final ByteOrder originalByteOrder = src.order();
+ src.order(ByteOrder.LITTLE_ENDIAN);
+ try {
+ // copy word at a time
+ while (length - 128 >= LONG_SIZE) {
+ for (int x = 0; x < 16; x++) {
+ MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
+ length -= LONG_SIZE;
+ dstAddress += LONG_SIZE;
+ }
+ }
+ while (length >= LONG_SIZE) {
MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
length -= LONG_SIZE;
dstAddress += LONG_SIZE;
}
- }
- while (length >= LONG_SIZE) {
- MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
- length -= LONG_SIZE;
- dstAddress += LONG_SIZE;
- }
- // copy last byte
- while (length > 0) {
- MemoryUtil.UNSAFE.putByte(dstAddress, src.get());
- --length;
- ++dstAddress;
+ // copy last byte
+ while (length > 0) {
+ MemoryUtil.UNSAFE.putByte(dstAddress, src.get());
+ --length;
+ ++dstAddress;
+ }
+ } finally {
+ src.order(originalByteOrder);
}
}
}
diff --git
a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
index 9b36a3b..ea74f21 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
@@ -19,8 +19,10 @@ package org.apache.arrow.memory;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
import org.junit.AfterClass;
@@ -36,7 +38,7 @@ public class TestArrowBuf {
public static void beforeClass() {
allocator = new RootAllocator(MAX_ALLOCATION);
}
-
+
/** Ensure the allocator is closed. */
@AfterClass
public static void afterClass() {
@@ -124,4 +126,24 @@ public class TestArrowBuf {
}
}
+ /** ARROW-9221: guard against big-endian byte buffers. */
+ @Test
+ public void testSetBytesBigEndian() {
+ final byte[] expected = new byte[64];
+ for (int i = 0; i < expected.length; i++) {
+ expected[i] = (byte) i;
+ }
+ // Only this code path is susceptible: others use unsafe or byte-by-byte
copies, while this override copies longs.
+ final ByteBuffer data = ByteBuffer.wrap(expected).asReadOnlyBuffer();
+ assertFalse(data.hasArray());
+ assertFalse(data.isDirect());
+ assertEquals(ByteOrder.BIG_ENDIAN, data.order());
+ try (ArrowBuf buf = allocator.buffer(expected.length)) {
+ buf.setBytes(0, data);
+ byte[] actual = new byte[expected.length];
+ buf.getBytes(0, actual);
+ assertArrayEquals(expected, actual);
+ }
+ }
+
}