[ 
https://issues.apache.org/jira/browse/PARQUET-2171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788999#comment-17788999
 ] 

ASF GitHub Bot commented on PARQUET-2171:
-----------------------------------------

wgtmac commented on code in PR #1139:
URL: https://github.com/apache/parquet-mr/pull/1139#discussion_r1402925218


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -125,6 +130,8 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300;

Review Comment:
   Any explanation for the magic number 300 here? Should it be configurable?



##########
parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java:
##########
@@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class<?>... 
argClasses) {
             .buildChecked();
       } catch (NoSuchMethodException e) {
         // not the right implementation
+        LOG.debug("failed to load constructor arity {} from class {}",
+          argClasses.length, className, e);
+

Review Comment:
   Please remove the blank line.



##########
parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java:
##########
@@ -91,6 +92,7 @@ public Builder(Configuration conf, Path filePath) {
       super(new HadoopParquetConfiguration(conf));
       this.conf = conf;
       this.filePath = filePath;
+

Review Comment:
   ditto



##########
parquet-hadoop/README.md:
##########
@@ -501,3 +501,11 @@ If `false`, key material is stored in separate new files, 
created in the same fo
 **Description:** Length of key encryption keys (KEKs), randomly generated by 
parquet key management tools. Can be 128, 192 or 256 bits.  
 **Default value:** `128`
 
+---
+
+**Property:** `parquet.hadoop.vectored.io.enabled`  

Review Comment:
   Thanks for adding the doc!



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.hadoop.util.vectorio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.util.DynMethods;
+
+/**
+ * Binding utils.
+ */
+public final class BindingUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BindingUtils.class);
+

Review Comment:
   ```suggestion
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int 
blockIndex, RowRanges r
     return internalReadFilteredRowGroup(block, rowRanges, 
getColumnIndexStore(blockIndex));
   }
 
+  /**
+   * Read data in all parts via either vectored IO or serial IO.
+   * @param allParts all parts to be read.
+   * @param builder used to build chunk list to read the pages for the 
different columns.
+   * @throws IOException any IOE.
+   */
+  private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> 
allParts, ChunkListBuilder builder)
+    throws IOException {
+    boolean isVectoredIO = options.useHadoopVectoredIO()
+      && f.readVectoredAvailable()
+      && partsLengthValidForVectoredIO(allParts);
+    if (isVectoredIO) {
+      readVectored(allParts, builder);
+    } else {
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        consecutiveChunks.readAll(f, builder);
+      }
+    }
+  }
+
+  /**
+   * Vectored IO doesn't support reading ranges of size greater than
+   * Integer.MAX_VALUE.
+   * @param allParts all parts to read.
+   * @return true or false.
+   */
+  private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> 
allParts) {
+    for (ConsecutivePartList consecutivePart :  allParts) {
+      if (consecutivePart.length >= Integer.MAX_VALUE) {
+        LOG.debug("Part length {} greater than Integer.MAX_VALUE thus 
disabling vectored IO", consecutivePart.length);

Review Comment:
   LOG.warn ?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int 
blockIndex, RowRanges r
     return internalReadFilteredRowGroup(block, rowRanges, 
getColumnIndexStore(blockIndex));
   }
 
+  /**
+   * Read data in all parts via either vectored IO or serial IO.
+   * @param allParts all parts to be read.
+   * @param builder used to build chunk list to read the pages for the 
different columns.
+   * @throws IOException any IOE.
+   */
+  private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> 
allParts, ChunkListBuilder builder)
+    throws IOException {
+    boolean isVectoredIO = options.useHadoopVectoredIO()
+      && f.readVectoredAvailable()
+      && partsLengthValidForVectoredIO(allParts);
+    if (isVectoredIO) {
+      readVectored(allParts, builder);
+    } else {
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        consecutiveChunks.readAll(f, builder);
+      }
+    }
+  }
+
+  /**
+   * Vectored IO doesn't support reading ranges of size greater than
+   * Integer.MAX_VALUE.
+   * @param allParts all parts to read.
+   * @return true or false.
+   */
+  private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> 
allParts) {
+    for (ConsecutivePartList consecutivePart :  allParts) {
+      if (consecutivePart.length >= Integer.MAX_VALUE) {
+        LOG.debug("Part length {} greater than Integer.MAX_VALUE thus 
disabling vectored IO", consecutivePart.length);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Read all parts through vectored IO.
+   * @param allParts all parts to be read.
+   * @param builder used to build chunk list to read the pages for the 
different columns.
+   * @throws IOException any IOE.
+   */
+  private void readVectored(List<ConsecutivePartList> allParts,
+    ChunkListBuilder builder) throws IOException {
+
+    List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+    for (ConsecutivePartList consecutiveChunks : allParts) {
+      Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
+        "Invalid length %s for vectored read operation. It must be less than 
max integer value.",
+        consecutiveChunks.length);
+      ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) 
consecutiveChunks.length));
+    }
+    LOG.debug("Doing vectored IO for ranges {}", ranges);
+    ByteBufferAllocator allocator = options.getAllocator();
+    //blocking or asynchronous vectored read.
+    f.readVectored(ranges, allocator::allocate);
+    int k = 0;
+    for (ConsecutivePartList consecutivePart :  allParts) {

Review Comment:
   ```suggestion
       for (ConsecutivePartList consecutivePart : allParts) {
   ```



##########
parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java:
##########
@@ -105,4 +107,21 @@ public abstract class SeekableInputStream extends 
InputStream {
    */
   public abstract void readFully(ByteBuffer buf) throws IOException;
 
+  /**
+   * Read a set of file ranges in a vectored manner.

Review Comment:
   It would be good to say what to expect if we have overlapping ranges.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1063,6 +1068,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int 
blockIndex, RowRanges r
     return internalReadFilteredRowGroup(block, rowRanges, 
getColumnIndexStore(blockIndex));
   }
 
+  /**
+   * Read data in all parts via either vectored IO or serial IO.
+   * @param allParts all parts to be read.
+   * @param builder used to build chunk list to read the pages for the 
different columns.
+   * @throws IOException any IOE.
+   */
+  private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> 
allParts, ChunkListBuilder builder)
+    throws IOException {
+    boolean isVectoredIO = options.useHadoopVectoredIO()
+      && f.readVectoredAvailable()
+      && partsLengthValidForVectoredIO(allParts);
+    if (isVectoredIO) {
+      readVectored(allParts, builder);
+    } else {
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        consecutiveChunks.readAll(f, builder);
+      }
+    }
+  }
+
+  /**
+   * Vectored IO doesn't support reading ranges of size greater than
+   * Integer.MAX_VALUE.
+   * @param allParts all parts to read.
+   * @return true or false.
+   */
+  private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> 
allParts) {
+    for (ConsecutivePartList consecutivePart :  allParts) {
+      if (consecutivePart.length >= Integer.MAX_VALUE) {
+        LOG.debug("Part length {} greater than Integer.MAX_VALUE thus 
disabling vectored IO", consecutivePart.length);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Read all parts through vectored IO.
+   * @param allParts all parts to be read.
+   * @param builder used to build chunk list to read the pages for the 
different columns.
+   * @throws IOException any IOE.
+   */
+  private void readVectored(List<ConsecutivePartList> allParts,
+    ChunkListBuilder builder) throws IOException {
+
+    List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+    for (ConsecutivePartList consecutiveChunks : allParts) {
+      Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,

Review Comment:
   You have checked this in partsLengthValidForVectoredIO, does it require to 
check again here?





> Implement vectored IO in parquet file format
> --------------------------------------------
>
>                 Key: PARQUET-2171
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2171
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>            Reporter: Mukund Thakur
>            Priority: Major
>
> We recently added a new feature called vectored IO in Hadoop for improving 
> read performance for seek heavy readers. Spark Jobs and others which uses 
> parquet will greatly benefit from this api. Details can be found hereĀ 
> [https://github.com/apache/hadoop/commit/e1842b2a749d79cbdc15c524515b9eda64c339d5]
> https://issues.apache.org/jira/browse/HADOOP-18103
> https://issues.apache.org/jira/browse/HADOOP-11867



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to