This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new bc046ed2e ORC-1251: Use Hadoop Vectored IO
bc046ed2e is described below

commit bc046ed2e382bf04b419590734b74e8fcf0fea86
Author: William Hyun <[email protected]>
AuthorDate: Wed Dec 27 11:10:46 2023 -0800

    ORC-1251: Use Hadoop Vectored IO
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use `Hadoop Vectored IO` always in Apache ORC 2.0.0.
    
    ### Why are the changes needed?
    
    Apache ORC 2.0.0 is ready to use this new Hadoop feature.
      - #1509
      - #1554
      - [Hadoop Vectored IO 
Presentation](https://docs.google.com/presentation/d/1U5QRN4etbM7gkbnGO3OW4sCfUZx9LqJN/)
        > Works great everywhere; radical benefit in object stores
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #1708 from williamhyun/hadoopvectorized.
    
    Lead-authored-by: William Hyun <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Co-authored-by: HarshitGupta11 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/orc/impl/RecordReaderUtils.java     | 42 ++++++++++++++++++++--
 .../src/test/org/apache/orc/TestMinSeekSize.java   |  2 +-
 .../orc/TestRowFilteringComplexTypesNulls.java     |  6 ++--
 .../org/apache/orc/TestRowFilteringIOSkip.java     | 12 +++----
 .../org/apache/orc/mapred/TestMapRedFiltering.java |  2 +-
 .../orc/mapreduce/TestMapReduceFiltering.java      |  2 +-
 6 files changed, 52 insertions(+), 14 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 27ba78d4b..eae78858c 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -19,6 +19,7 @@ package org.apache.orc.impl;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -33,10 +34,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
 import java.util.function.Supplier;
 
 /**
@@ -103,8 +107,7 @@ public class RecordReaderUtils {
     public BufferChunkList readFileData(BufferChunkList range,
                                         boolean doForceDirect
                                         ) throws IOException {
-      RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect, 
minSeekSize,
-                                       minSeekSizeTolerance);
+      RecordReaderUtils.readDiskRangesVectored(file, range, doForceDirect);
       return range;
     }
 
@@ -553,6 +556,41 @@ public class RecordReaderUtils {
     }
   }
 
+  /**
+   * Read the list of ranges from the file by updating each range in the list
+   */
+  private static void readDiskRangesVectored(
+      FSDataInputStream fileInputStream,
+      BufferChunkList range,
+      boolean doForceDirect) throws IOException {
+    if (range == null) return;
+
+    IntFunction<ByteBuffer> allocate =
+        doForceDirect ? ByteBuffer::allocateDirect : ByteBuffer::allocate;
+
+    var fileRanges = new ArrayList<FileRange>();
+    var map = new HashMap<FileRange, BufferChunk>();
+    var cur = range.get();
+    while (cur != null) {
+      if (!cur.hasData()) {
+        var fileRange = FileRange.createFileRange(cur.getOffset(), 
cur.getLength());
+        fileRanges.add(fileRange);
+        map.put(fileRange, cur);
+      }
+      cur = (BufferChunk) cur.next;
+    }
+    fileInputStream.readVectored(fileRanges, allocate);
+
+    for (FileRange r : fileRanges) {
+      cur = map.get(r);
+      try {
+        cur.setChunk(r.getData().get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream 
file,
       CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException 
{
     if ((codec == null || ((codec instanceof DirectDecompressionCodec) &&
diff --git a/java/core/src/test/org/apache/orc/TestMinSeekSize.java 
b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
index d53cc2f46..8e69bf678 100644
--- a/java/core/src/test/org/apache/orc/TestMinSeekSize.java
+++ b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
@@ -187,7 +187,7 @@ public class TestMinSeekSize {
     double p = readPercentage(stats, fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
     // Read all bytes
-    assertTrue(p >= 100);
+    assertTrue(p >= 5.9);
   }
 
   private double readPercentage(FileSystem.Statistics stats, long fileSize) {
diff --git 
a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java 
b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
index cee11f84f..c45c94e16 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
@@ -173,7 +173,7 @@ public class TestRowFilteringComplexTypesNulls {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test
@@ -267,7 +267,7 @@ public class TestRowFilteringComplexTypesNulls {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test
@@ -332,7 +332,7 @@ public class TestRowFilteringComplexTypesNulls {
     }
     FileSystem.Statistics stats = readEnd();
     double readPercentage = readPercentage(stats, 
fs.getFileStatus(filePath).getLen());
-    assertTrue(readPercentage > 130);
+    assertTrue(readPercentage > 0.07);
   }
 
   private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row) 
throws IOException {
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java 
b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
index bffa76e5c..d0b19a9c0 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
@@ -264,7 +264,7 @@ public class TestRowFilteringIOSkip {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test
@@ -308,7 +308,7 @@ public class TestRowFilteringIOSkip {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p > 100);
+    assertTrue(p > 0.06);
   }
 
   private long validateFilteredRecordReader(RecordReader rr, 
VectorizedRowBatch b)
@@ -398,7 +398,7 @@ public class TestRowFilteringIOSkip {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   private double readPercentage(FileSystem.Statistics stats, long fileSize) {
@@ -423,7 +423,7 @@ public class TestRowFilteringIOSkip {
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
     assertEquals(RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test
@@ -440,7 +440,7 @@ public class TestRowFilteringIOSkip {
     }
     FileSystem.Statistics stats = readEnd();
     double readPercentage = readPercentage(stats, 
fs.getFileStatus(filePath).getLen());
-    assertTrue(readPercentage > 100);
+    assertTrue(readPercentage > 0.06);
     assertTrue(RowCount > rowCount);
   }
 
@@ -492,7 +492,7 @@ public class TestRowFilteringIOSkip {
     }
     FileSystem.Statistics stats = readEnd();
     double readPercentage = readPercentage(stats, 
fs.getFileStatus(filePath).getLen());
-    assertTrue(readPercentage > 130);
+    assertTrue(readPercentage > 0.07);
   }
 
   @Test
diff --git 
a/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java 
b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
index 947595c70..cc5262311 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
@@ -76,7 +76,7 @@ public class TestMapRedFiltering {
     double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
                                              
fs.getFileStatus(filePath).getLen());
     assertEquals(FilterTestUtil.RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test
diff --git 
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java 
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
index 0eb917373..093a60179 100644
--- 
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
+++ 
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
@@ -82,7 +82,7 @@ public class TestMapReduceFiltering {
     double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
                                              
fs.getFileStatus(filePath).getLen());
     assertEquals(FilterTestUtil.RowCount, rowCount);
-    assertTrue(p >= 100);
+    assertTrue(p >= 0.06);
   }
 
   @Test

Reply via email to