[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17539724#comment-17539724 ]
ASF GitHub Bot commented on PARQUET-2149: ----------------------------------------- kazuyukitanimura commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r877368066 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ########## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; +import org.apache.parquet.ParquetRuntimeException; Review Comment: Also I am not sure if `ParquetRuntimeException` is used... ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ########## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; Review Comment: I can't seem to find `Iterator` is used... Should we remove if it is the case? ########## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue<Future<Void>> readFutures; + boolean closed = false; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); Review Comment: Does it make sense to change these to `private` if they are not accessed from other places? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ########## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Queue; Review Comment: I think `Queue` is no longer used with this change. Should we remove the import? > Implement async IO for Parquet file reader > ------------------------------------------ > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr > Reporter: Parth Chandra > Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)