This is an automated email from the ASF dual-hosted git repository.

liyafan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ae4eb60  ARROW-9221: [Java] account for big-endian buffers in 
ArrowBuf.setBytes
ae4eb60 is described below

commit ae4eb6089316c68d202ced3602b6de1ad77d4878
Author: David Li <[email protected]>
AuthorDate: Fri Jul 3 10:53:37 2020 +0800

    ARROW-9221: [Java] account for big-endian buffers in ArrowBuf.setBytes
    
    `ArrowBuf.setBytes` has an override that uses a 8-byte-at-a-time copy loop 
if the byte buffer does not provide an array and is not direct. Unfortunately, 
this means it'll mangle data when the byte buffer is big-endian, as it then 
writes the data into the little-endian ArrowBuf. This fixes it by setting the 
byte order before copying, and then restoring it.
    
    Closes #7543 from lidavidm/arrow-9221
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: liyafan82 <[email protected]>
---
 .../arrow/flight/TestApplicationMetadata.java      | 54 ++++++++++++++++++++++
 .../java/org/apache/arrow/memory/ArrowBuf.java     | 34 ++++++++------
 .../java/org/apache/arrow/memory/TestArrowBuf.java | 24 +++++++++-
 3 files changed, 97 insertions(+), 15 deletions(-)

diff --git 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
index 099b158..c7b3321 100644
--- 
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
+++ 
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java
@@ -196,6 +196,36 @@ public class TestApplicationMetadata {
     });
   }
 
+  /**
+   * ARROW-9221: Flight copies metadata from the byte buffer of a Protobuf 
ByteString,
+   * which is in big-endian by default, thus mangling metadata.
+   */
+  @Test
+  public void testMetadataEndianness() throws Exception {
+    try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         final BufferAllocator serverAllocator = 
allocator.newChildAllocator("flight-server", 0, Long.MAX_VALUE);
+         final FlightServer server = FlightTestUtil.getStartedServer(
+             (location) -> FlightServer
+                 .builder(serverAllocator, location, new 
EndianFlightProducer(serverAllocator))
+                 .build());
+         final FlightClient client = FlightClient.builder(allocator, 
server.getLocation()).build()) {
+      final Schema schema = new Schema(Collections.emptyList());
+      final FlightDescriptor descriptor = FlightDescriptor.command(new 
byte[0]);
+      try (final SyncPutListener reader = new SyncPutListener();
+           final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+        final FlightClient.ClientStreamListener writer = 
client.startPut(descriptor, root, reader);
+        writer.completed();
+        try (final PutResult metadata = reader.read()) {
+          Assert.assertEquals(16, 
metadata.getApplicationMetadata().readableBytes());
+          byte[] bytes = new byte[16];
+          metadata.getApplicationMetadata().readBytes(bytes);
+          Assert.assertArrayEquals(EndianFlightProducer.EXPECTED_BYTES, bytes);
+        }
+        writer.getResult();
+      }
+    }
+  }
+
   private void test(BiConsumer<BufferAllocator, FlightClient> fun) {
     try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
         final FlightServer s =
@@ -272,4 +302,28 @@ public class TestApplicationMetadata {
       };
     }
   }
+
+  private static class EndianFlightProducer extends NoOpFlightProducer {
+    static final byte[] EXPECTED_BYTES = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 12, 13, 14, 15};
+    private final BufferAllocator allocator;
+
+    private EndianFlightProducer(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    @Override
+    public Runnable acceptPut(CallContext context, FlightStream flightStream, 
StreamListener<PutResult> ackStream) {
+      return () -> {
+        while (flightStream.next()) {
+          // Ignore any data
+        }
+
+        try (final ArrowBuf buf = allocator.buffer(16)) {
+          buf.writeBytes(EXPECTED_BYTES);
+          ackStream.onNext(PutResult.metadata(buf));
+        }
+        ackStream.onCompleted();
+      };
+    }
+  }
 }
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java 
b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
index 92a14c2..e81a92f 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
@@ -845,24 +845,30 @@ public final class ArrowBuf implements AutoCloseable {
         // after copy, bump the next read position for the src ByteBuffer
         src.position(src.position() + length);
       } else {
-        // copy word at a time
-        while (length - 128 >= LONG_SIZE) {
-          for (int x = 0; x < 16; x++) {
+        final ByteOrder originalByteOrder = src.order();
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        try {
+          // copy word at a time
+          while (length - 128 >= LONG_SIZE) {
+            for (int x = 0; x < 16; x++) {
+              MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
+              length -= LONG_SIZE;
+              dstAddress += LONG_SIZE;
+            }
+          }
+          while (length >= LONG_SIZE) {
             MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
             length -= LONG_SIZE;
             dstAddress += LONG_SIZE;
           }
-        }
-        while (length >= LONG_SIZE) {
-          MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
-          length -= LONG_SIZE;
-          dstAddress += LONG_SIZE;
-        }
-        // copy last byte
-        while (length > 0) {
-          MemoryUtil.UNSAFE.putByte(dstAddress, src.get());
-          --length;
-          ++dstAddress;
+          // copy last byte
+          while (length > 0) {
+            MemoryUtil.UNSAFE.putByte(dstAddress, src.get());
+            --length;
+            ++dstAddress;
+          }
+        } finally {
+          src.order(originalByteOrder);
         }
       }
     }
diff --git 
a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java 
b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
index 9b36a3b..ea74f21 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java
@@ -19,8 +19,10 @@ package org.apache.arrow.memory;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 
 import org.junit.AfterClass;
@@ -36,7 +38,7 @@ public class TestArrowBuf {
   public static void beforeClass() {
     allocator = new RootAllocator(MAX_ALLOCATION);
   }
-  
+
   /** Ensure the allocator is closed. */
   @AfterClass
   public static void afterClass() {
@@ -124,4 +126,24 @@ public class TestArrowBuf {
     }
   }
 
+  /** ARROW-9221: guard against big-endian byte buffers. */
+  @Test
+  public void testSetBytesBigEndian() {
+    final byte[] expected = new byte[64];
+    for (int i = 0; i < expected.length; i++) {
+      expected[i] = (byte) i;
+    }
+    // Only this code path is susceptible: others use unsafe or byte-by-byte 
copies, while this override copies longs.
+    final ByteBuffer data = ByteBuffer.wrap(expected).asReadOnlyBuffer();
+    assertFalse(data.hasArray());
+    assertFalse(data.isDirect());
+    assertEquals(ByteOrder.BIG_ENDIAN, data.order());
+    try (ArrowBuf buf = allocator.buffer(expected.length)) {
+      buf.setBytes(0, data);
+      byte[] actual = new byte[expected.length];
+      buf.getBytes(0, actual);
+      assertArrayEquals(expected, actual);
+    }
+  }
+
 }

Reply via email to