This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new bc046ed2e ORC-1251: Use Hadoop Vectored IO
bc046ed2e is described below
commit bc046ed2e382bf04b419590734b74e8fcf0fea86
Author: William Hyun <[email protected]>
AuthorDate: Wed Dec 27 11:10:46 2023 -0800
ORC-1251: Use Hadoop Vectored IO
### What changes were proposed in this pull request?
This PR aims to use `Hadoop Vectored IO` always in Apache ORC 2.0.0.
### Why are the changes needed?
Apache ORC 2.0.0 is ready to use this new Hadoop feature.
- #1509
- #1554
- [Hadoop Vectored IO
Presentation](https://docs.google.com/presentation/d/1U5QRN4etbM7gkbnGO3OW4sCfUZx9LqJN/)
> Works great everywhere; radical benefit in object stores
### How was this patch tested?
Pass the CIs.
Closes #1708 from williamhyun/hadoopvectorized.
Lead-authored-by: William Hyun <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: HarshitGupta11 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/orc/impl/RecordReaderUtils.java | 42 ++++++++++++++++++++--
.../src/test/org/apache/orc/TestMinSeekSize.java | 2 +-
.../orc/TestRowFilteringComplexTypesNulls.java | 6 ++--
.../org/apache/orc/TestRowFilteringIOSkip.java | 12 +++----
.../org/apache/orc/mapred/TestMapRedFiltering.java | 2 +-
.../orc/mapreduce/TestMapReduceFiltering.java | 2 +-
6 files changed, 52 insertions(+), 14 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 27ba78d4b..eae78858c 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -19,6 +19,7 @@ package org.apache.orc.impl;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -33,10 +34,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
import java.util.function.Supplier;
/**
@@ -103,8 +107,7 @@ public class RecordReaderUtils {
public BufferChunkList readFileData(BufferChunkList range,
boolean doForceDirect
) throws IOException {
- RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect,
minSeekSize,
- minSeekSizeTolerance);
+ RecordReaderUtils.readDiskRangesVectored(file, range, doForceDirect);
return range;
}
@@ -553,6 +556,41 @@ public class RecordReaderUtils {
}
}
+ /**
+ * Read the list of ranges from the file by updating each range in the list
+ */
+ private static void readDiskRangesVectored(
+ FSDataInputStream fileInputStream,
+ BufferChunkList range,
+ boolean doForceDirect) throws IOException {
+ if (range == null) return;
+
+ IntFunction<ByteBuffer> allocate =
+ doForceDirect ? ByteBuffer::allocateDirect : ByteBuffer::allocate;
+
+ var fileRanges = new ArrayList<FileRange>();
+ var map = new HashMap<FileRange, BufferChunk>();
+ var cur = range.get();
+ while (cur != null) {
+ if (!cur.hasData()) {
+ var fileRange = FileRange.createFileRange(cur.getOffset(),
cur.getLength());
+ fileRanges.add(fileRange);
+ map.put(fileRange, cur);
+ }
+ cur = (BufferChunk) cur.next;
+ }
+ fileInputStream.readVectored(fileRanges, allocate);
+
+ for (FileRange r : fileRanges) {
+ cur = map.get(r);
+ try {
+ cur.setChunk(r.getData().get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream
file,
CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException
{
if ((codec == null || ((codec instanceof DirectDecompressionCodec) &&
diff --git a/java/core/src/test/org/apache/orc/TestMinSeekSize.java
b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
index d53cc2f46..8e69bf678 100644
--- a/java/core/src/test/org/apache/orc/TestMinSeekSize.java
+++ b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
@@ -187,7 +187,7 @@ public class TestMinSeekSize {
double p = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
// Read all bytes
- assertTrue(p >= 100);
+ assertTrue(p >= 5.9);
}
private double readPercentage(FileSystem.Statistics stats, long fileSize) {
diff --git
a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
index cee11f84f..c45c94e16 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java
@@ -173,7 +173,7 @@ public class TestRowFilteringComplexTypesNulls {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test
@@ -267,7 +267,7 @@ public class TestRowFilteringComplexTypesNulls {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test
@@ -332,7 +332,7 @@ public class TestRowFilteringComplexTypesNulls {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats,
fs.getFileStatus(filePath).getLen());
- assertTrue(readPercentage > 130);
+ assertTrue(readPercentage > 0.07);
}
private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row)
throws IOException {
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
index bffa76e5c..d0b19a9c0 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
@@ -264,7 +264,7 @@ public class TestRowFilteringIOSkip {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test
@@ -308,7 +308,7 @@ public class TestRowFilteringIOSkip {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p > 100);
+ assertTrue(p > 0.06);
}
private long validateFilteredRecordReader(RecordReader rr,
VectorizedRowBatch b)
@@ -398,7 +398,7 @@ public class TestRowFilteringIOSkip {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
private double readPercentage(FileSystem.Statistics stats, long fileSize) {
@@ -423,7 +423,7 @@ public class TestRowFilteringIOSkip {
}
double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
assertEquals(RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test
@@ -440,7 +440,7 @@ public class TestRowFilteringIOSkip {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats,
fs.getFileStatus(filePath).getLen());
- assertTrue(readPercentage > 100);
+ assertTrue(readPercentage > 0.06);
assertTrue(RowCount > rowCount);
}
@@ -492,7 +492,7 @@ public class TestRowFilteringIOSkip {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats,
fs.getFileStatus(filePath).getLen());
- assertTrue(readPercentage > 130);
+ assertTrue(readPercentage > 0.07);
}
@Test
diff --git
a/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
index 947595c70..cc5262311 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
@@ -76,7 +76,7 @@ public class TestMapRedFiltering {
double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
fs.getFileStatus(filePath).getLen());
assertEquals(FilterTestUtil.RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test
diff --git
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
index 0eb917373..093a60179 100644
---
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
+++
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
@@ -82,7 +82,7 @@ public class TestMapReduceFiltering {
double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
fs.getFileStatus(filePath).getLen());
assertEquals(FilterTestUtil.RowCount, rowCount);
- assertTrue(p >= 100);
+ assertTrue(p >= 0.06);
}
@Test