This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fca88d01 ORC-1473: Fix zero copy read bugs
9fca88d01 is described below

commit 9fca88d01c1fc0fc47d8a0133bd9378f3537098d
Author: xiebin <[email protected]>
AuthorDate: Thu Sep 21 11:01:44 2023 -0700

    ORC-1473: Fix zero copy read bugs
    
    ### What changes were proposed in this pull request?
    #### org.apache.orc.impl.RecordReaderUtils#zeroCopyReadRanges
    `ByteBuffer copy = currentBuffer.duplicate();
    copy.position((int) (current.getOffset() - currentOffset)); `
    if currentBuffer position is not 0, copy.position() will set a uncorrect 
position.
    We should use slice() replace duplicate().
    
    #### 
org.apache.orc.impl.reader.StripePlanner.StreamInformation#releaseBuffers
    The buffer returned by BufferChunk#getData() is not the original buffer 
from HadoopShims.ZeroCopyReaderShim#readBuffer(int, boolean), it is the slice() 
or duplicate() of the original buffer. So releaseBuffer() will throw a 
`IllegalArgumentException `
    
    ### Why are the changes needed?
    fix bugs
    
    ### How was this patch tested?
    I have mocked a `MockDFSDataInputStream` and added a UT to reproduct and 
test the bugs.
    Or you can test manually with a hdfs cluster.
    
    Closes #1574 from xbthink/dev.
    
    Authored-by: xiebin <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 java/core/src/java/org/apache/orc/DataReader.java  |   9 ++
 .../org/apache/orc/impl/RecordReaderUtils.java     |  18 +++-
 .../org/apache/orc/impl/reader/StripePlanner.java  |  15 +--
 .../apache/orc/impl/MockDFSDataInputStream.java    | 104 +++++++++++++++++++++
 .../test/org/apache/orc/impl/MockDataReader.java   |  10 ++
 .../org/apache/orc/impl/TestRecordReaderUtils.java |  44 +++++++++
 .../src/java/org/apache/orc/impl/HadoopShims.java  |  10 ++
 .../java/org/apache/orc/impl/ZeroCopyShims.java    |  25 ++++-
 8 files changed, 224 insertions(+), 11 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/DataReader.java 
b/java/core/src/java/org/apache/orc/DataReader.java
index f181d6b22..d6f61016b 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -54,9 +54,18 @@ public interface DataReader extends AutoCloseable, Cloneable 
{
   /**
    * Releases buffers created by readFileData. See readFileData javadoc.
    * @param toRelease The buffer to release.
+   *
+   * @deprecated Use {@link #releaseAllBuffers()} instead. This method was
+   * incorrectly used by upper level code and shouldn't be used anymore.
    */
+  @Deprecated
   void releaseBuffer(ByteBuffer toRelease);
 
+  /**
+   * Releases all buffers created by readFileData. See readFileData javadoc.
+   */
+  void releaseAllBuffers();
+
   /**
    * Clone the entire state of the DataReader with the assumption that the
    * clone will be closed at a different time. Thus, any file handles in the
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index ede844276..8c37246b7 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -131,11 +131,21 @@ public class RecordReaderUtils {
       return zcr != null;
     }
 
+    /**
+     * @deprecated Use {@link #releaseAllBuffers()} instead. This method was
+     * incorrectly used by upper level code and shouldn't be used anymore.
+     */
+    @Deprecated
     @Override
     public void releaseBuffer(ByteBuffer buffer) {
       zcr.releaseBuffer(buffer);
     }
 
+    @Override
+    public void releaseAllBuffers() {
+      zcr.releaseAllBuffers();
+    }
+
     @Override
     public DataReader clone() {
       if (this.file != null) {
@@ -374,7 +384,7 @@ public class RecordReaderUtils {
 
       // did we get the current range in a single read?
       if (currentOffset + currentBuffer.remaining() >= current.getEnd()) {
-        ByteBuffer copy = currentBuffer.duplicate();
+        ByteBuffer copy = currentBuffer.slice();
         copy.position((int) (current.getOffset() - currentOffset));
         copy.limit(copy.position() + current.getLength());
         current.setChunk(copy);
@@ -385,7 +395,7 @@ public class RecordReaderUtils {
                               ? ByteBuffer.allocateDirect(current.getLength())
                               : ByteBuffer.allocate(current.getLength());
         // we know that the range spans buffers
-        ByteBuffer copy = currentBuffer.duplicate();
+        ByteBuffer copy = currentBuffer.slice();
         // skip over the front matter
         copy.position((int) (current.getOffset() - currentOffset));
         result.put(copy);
@@ -394,11 +404,11 @@ public class RecordReaderUtils {
         currentBuffer = buffers.next();
         while (result.hasRemaining()) {
           if (result.remaining() > currentBuffer.remaining()) {
-            result.put(currentBuffer.duplicate());
+            result.put(currentBuffer.slice());
             currentOffset += currentBuffer.remaining();
             currentBuffer = buffers.next();
           } else {
-            copy = currentBuffer.duplicate();
+            copy = currentBuffer.slice();
             copy.limit(result.remaining());
             result.put(copy);
           }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java 
b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
index d5aad0834..23afe8918 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
@@ -28,6 +28,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.impl.BufferChunkList;
 import org.apache.orc.impl.CryptoUtils;
+import org.apache.orc.impl.HadoopShims;
 import org.apache.orc.impl.InStream;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.PhysicalFsWriter;
@@ -214,12 +215,7 @@ public class StripePlanner {
    */
   public void clearStreams() {
     if (dataReader.isTrackingDiskRanges()) {
-      for (StreamInformation stream : indexStreams) {
-        stream.releaseBuffers(dataReader);
-      }
-      for (StreamInformation stream : dataStreams) {
-        stream.releaseBuffers(dataReader);
-      }
+      dataReader.releaseAllBuffers();
     }
     indexStreams.clear();
     dataStreams.clear();
@@ -619,6 +615,13 @@ public class StripePlanner {
       this.length = length;
     }
 
+    /**
+     * @deprecated The buffer returned by {@link BufferChunk#getData()} is not 
the original buffer
+     * from {@link HadoopShims.ZeroCopyReaderShim#readBuffer(int, boolean)}, 
it is the slice()
+     * or duplicate() of the original buffer. So this releaseBuffers() is 
incorrect and we should
+     * use {@link DataReader#releaseAllBuffers()} instead.
+     */
+    @Deprecated
     void releaseBuffers(DataReader reader) {
       long end = offset + length;
       BufferChunk ptr = firstChunk;
diff --git a/java/core/src/test/org/apache/orc/impl/MockDFSDataInputStream.java 
b/java/core/src/test/org/apache/orc/impl/MockDFSDataInputStream.java
new file mode 100644
index 000000000..d5135c0bf
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/MockDFSDataInputStream.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.IdentityHashMap;
+
+public class MockDFSDataInputStream extends InputStream implements Seekable, 
PositionedReadable, HasEnhancedByteBufferAccess {
+
+  private ByteBuffer hdfsBlockBuffer;
+  private int startPosition;
+  private int currentPosition;
+  private IdentityHashMap<ByteBuffer, ByteBufferPool> bufferStore = new 
IdentityHashMap(0);
+
+  public MockDFSDataInputStream(ByteBuffer hdfsBlockBuffer, int startPosition) 
{
+    this.hdfsBlockBuffer = hdfsBlockBuffer;
+    this.startPosition = startPosition;
+    this.currentPosition = startPosition;
+  }
+
+  @Override
+  public int read() throws IOException {
+    currentPosition++;
+    return hdfsBlockBuffer.get();
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool byteBufferPool, int i, 
EnumSet<ReadOption> enumSet) throws IOException, UnsupportedOperationException {
+    ByteBuffer copy = hdfsBlockBuffer.duplicate();
+    copy.limit(copy.position() + i);
+    currentPosition += i;
+    hdfsBlockBuffer.position(currentPosition - startPosition);
+    bufferStore.put(copy, byteBufferPool);
+    return copy;
+  }
+
+  @Override
+  public void releaseBuffer(ByteBuffer byteBuffer) {
+    Object val = bufferStore.remove(byteBuffer);
+    if (val == null) {
+      throw new IllegalArgumentException("tried to release a buffer that was 
not created by this stream, " + byteBuffer);
+    }
+  }
+
+  @Override
+  public void seek(long l) throws IOException {
+    currentPosition = (int) l;
+    hdfsBlockBuffer.position(currentPosition - startPosition);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return currentPosition;
+  }
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    throw new RuntimeException("unsupported");
+  }
+
+  public boolean isAllReleased() {
+    return bufferStore.isEmpty();
+  }
+
+  @Override
+  public int read(long l, byte[] bytes, int i, int i1) throws IOException {
+    throw new RuntimeException("unsupported");
+  }
+
+  @Override
+  public void readFully(long l, byte[] bytes, int i, int i1) throws 
IOException {
+    throw new RuntimeException("unsupported");
+  }
+
+  @Override
+  public void readFully(long l, byte[] bytes) throws IOException {
+    throw new RuntimeException("unsupported");
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/MockDataReader.java 
b/java/core/src/test/org/apache/orc/impl/MockDataReader.java
index 231f8110e..9e2a231a1 100644
--- a/java/core/src/test/org/apache/orc/impl/MockDataReader.java
+++ b/java/core/src/test/org/apache/orc/impl/MockDataReader.java
@@ -98,11 +98,21 @@ public class MockDataReader implements DataReader {
     return true;
   }
 
+  /**
+   * @deprecated Use {@link #releaseAllBuffers()} instead. This method was
+   * incorrectly used by upper level code and shouldn't be used anymore.
+   */
+  @Deprecated
   @Override
   public void releaseBuffer(ByteBuffer toRelease) {
     outBuffers.remove(toRelease);
   }
 
+  @Override
+  public void releaseAllBuffers() {
+    outBuffers.clear();
+  }
+
   @Override
   public DataReader clone() {
     throw new UnsupportedOperationException("Clone not supported.");
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java 
b/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
index 1153b6751..d5f3550d3 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
@@ -18,14 +18,20 @@
 
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Objects;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 
 class TestRecordReaderUtils {
 
@@ -140,6 +146,44 @@ class TestRecordReaderUtils {
     assertEquals(chunkReader.getReadBytes(), 
chunkReader.getFrom().getData().array().length);
   }
 
+  @Test
+  public void testZeroCopyReadAndRelease() throws IOException {
+    int blockSize = 4096;
+    ByteBuffer hdfsBlockMMapBuffer = makeByteBuffer(blockSize, 0);
+    int blockStartPosition = 4096;
+    MockDFSDataInputStream dis = new 
MockDFSDataInputStream(hdfsBlockMMapBuffer, blockStartPosition);
+    FSDataInputStream fis = new FSDataInputStream(dis);
+    RecordReaderUtils.ByteBufferAllocatorPool pool = new 
RecordReaderUtils.ByteBufferAllocatorPool();
+    HadoopShims.ZeroCopyReaderShim zrc = 
RecordReaderUtils.createZeroCopyShim(fis, null, pool);
+    BufferChunkList rangeList = new TestOrcLargeStripe.RangeBuilder()
+            .range(5000, 1000)
+            .range(6000, 1000)
+            .range(7000, 500).build();
+    RecordReaderUtils.zeroCopyReadRanges(fis, zrc, rangeList.get(0), 
rangeList.get(2), false);
+
+    assertArrayEquals(Arrays.copyOfRange(hdfsBlockMMapBuffer.array(), 5000 - 
blockStartPosition, 5000 - blockStartPosition + 1000), 
byteBufferToArray(rangeList.get(0).getData()));
+    assertArrayEquals(Arrays.copyOfRange(hdfsBlockMMapBuffer.array(), 6000 - 
blockStartPosition, 6000 - blockStartPosition + 1000), 
byteBufferToArray(rangeList.get(1).getData()));
+    assertArrayEquals(Arrays.copyOfRange(hdfsBlockMMapBuffer.array(), 7000 - 
blockStartPosition, 7000 - blockStartPosition + 500), 
byteBufferToArray(rangeList.get(2).getData()));
+
+    assertThrowsExactly(IllegalArgumentException.class, new Executable() {
+      @Override
+      public void execute() throws Throwable {
+        zrc.releaseBuffer(rangeList.get(0).getData());
+      }
+    });
+
+    zrc.releaseAllBuffers();
+
+    assertTrue(dis.isAllReleased());
+  }
+
+  private static byte[] byteBufferToArray(ByteBuffer buf) {
+    byte[] resultArray = new byte[buf.remaining()];
+    ByteBuffer buffer = buf.slice();
+    buffer.get(resultArray);
+    return resultArray;
+  }
+
   private ByteBuffer makeByteBuffer(int length, long offset) {
     byte[] readBytes = new byte[length];
     for (int i = 0; i < readBytes.length; i++) {
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java 
b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
index 6c8d5ca37..d15768de8 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
@@ -103,9 +103,19 @@ public interface HadoopShims {
     /**
      * Release a ByteBuffer obtained from a readBuffer on this
      * ZeroCopyReaderShim.
+     *
+     * @deprecated Use {@link #releaseAllBuffers()} instead. This method was
+     * incorrectly used by upper level code and shouldn't be used anymore.
      */
+    @Deprecated
     void releaseBuffer(ByteBuffer buffer);
 
+    /**
+     * Release all ByteBuffers obtained from readBuffer on this
+     * ZeroCopyReaderShim.
+     */
+    void releaseAllBuffers();
+
     /**
      * Close the underlying stream.
      */
diff --git a/java/shims/src/java/org/apache/orc/impl/ZeroCopyShims.java 
b/java/shims/src/java/org/apache/orc/impl/ZeroCopyShims.java
index 07e4aa52a..c1a86cd2e 100644
--- a/java/shims/src/java/org/apache/orc/impl/ZeroCopyShims.java
+++ b/java/shims/src/java/org/apache/orc/impl/ZeroCopyShims.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.ByteBufferPool;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
+import java.util.IdentityHashMap;
 
 class ZeroCopyShims {
   private static final class ByteBufferPoolAdapter implements ByteBufferPool {
@@ -52,6 +53,11 @@ class ZeroCopyShims {
     private static final EnumSet<ReadOption> NO_CHECK_SUM = EnumSet
         .of(ReadOption.SKIP_CHECKSUMS);
 
+    // Use IdentityHashMap like hadoop's IdentityHashStore.
+    // It compares keys using {@link System#identityHashCode(Object)} and the 
identity operator.
+    // This is useful for types like ByteBuffer which have expensive hashCode 
and equals operators.
+    private final IdentityHashMap<ByteBuffer, Object> readBuffers = new 
IdentityHashMap<>(0);
+
     ZeroCopyAdapter(FSDataInputStream in,
                            HadoopShims.ByteBufferPoolShim poolshim) {
       this.in = in;
@@ -69,16 +75,33 @@ class ZeroCopyShims {
       if (verifyChecksums) {
         options = CHECK_SUM;
       }
-      return this.in.read(this.pool, maxLength, options);
+
+      ByteBuffer bb = this.in.read(this.pool, maxLength, options);
+      readBuffers.put(bb, null);
+      return bb;
     }
 
+    /**
+     * @deprecated Use {@link #releaseAllBuffers()} instead. This method was
+     * incorrectly used by upper level code and shouldn't be used anymore.
+     */
+    @Deprecated
     @Override
     public void releaseBuffer(ByteBuffer buffer) {
       this.in.releaseBuffer(buffer);
     }
 
+    @Override
+    public void releaseAllBuffers() {
+      readBuffers.forEach((k, v) -> {
+        this.in.releaseBuffer(k);
+      });
+      readBuffers.clear();
+    }
+
     @Override
     public void close() throws IOException {
+      releaseAllBuffers();
       this.in.close();
     }
   }

Reply via email to