[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556109#comment-17556109
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-----------------------------------------

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -46,12 +46,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;

Review Comment:
   I guess it is IDE does that but let's not use wildcard here 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing

Review Comment:
   I understand we want applications to provide their own implementations, but 
can you share why we choose the cached thread pool instead of fixed in default? 
I kind of feel a lot of user scenarios of Parquet is with unpredictable 
execution times and we need better control over our program's resource 
consumption. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1387,8 +1489,13 @@ public void close() throws IOException {
    * result of the column-index based filtering when some pages might be 
skipped at reading.
    */
   private class ChunkListBuilder {
+    // ChunkData is backed by either a list of buffers or a list of strams

Review Comment:
   typo? streams? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   The 'dictionaryPageAAD' is only used in toString()? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -659,6 +696,8 @@ public static ParquetFileReader open(InputFile file, 
ParquetReadOptions options)
   }
 
   protected final SeekableInputStream f;
+  // input streams opened in the async mode
+  protected final List<SeekableInputStream> inputStreamList = new 
ArrayList<>();

Review Comment:
   should we call it inputStream? or just streams? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();
       }
+      if (this.currentRowGroup != null) {
+        this.currentRowGroup.close();
+      }
+      for (SeekableInputStream is : inputStreamList) {
+        is.close();
+      }
     } finally {
+      //TODO: make sure that any outstanding io tasks submitted by this reader 
have been cancelled.

Review Comment:
   let's not add tech debt. Let's add verification in this PR. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, 
ParquetReadOptions options) throws IOEx
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  private boolean isAsyncIOReaderEnabled(){
+    if (options.isAsyncIOReaderEnabled() ) {
+      if (ioThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parquet async IO is configured but the IO thread pool has 
not been " +
+          "initialized. Configuration is being ignored");
+      }
+    }
+    return false;
+  }
+
+  private boolean isParallelColumnReaderEnabled(){
+    if (options.isParallelColumnReaderEnabled() ) {
+      if (processThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parallel column reading is configured but the process thread 
pool has " +

Review Comment:
   Same comment as above 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;

Review Comment:
   Can we check if 'ioPool' is null? In cases of null, is it valid? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, 
ParquetReadOptions options) throws IOEx
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  private boolean isAsyncIOReaderEnabled(){
+    if (options.isAsyncIOReaderEnabled() ) {
+      if (ioThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parquet async IO is configured but the IO thread pool has 
not been " +

Review Comment:
   isAsyncIOReaderEnabled() is called at page level if I am correct. In a 
large-scale system, this log could generate too much data.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -937,6 +1003,10 @@ private ColumnChunkPageReadStore internalReadRowGroup(int 
blockIndex) throws IOE
       ColumnPath pathKey = mc.getPath();
       ColumnDescriptor columnDescriptor = paths.get(pathKey);
       if (columnDescriptor != null) {
+        // If async, we need a new stream for every column

Review Comment:
   Make the comments more specfic "If async IO or parallel reader feature is 
enabled, we need a new stream for every column"



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();

Review Comment:
   Where we use 'numProcessors'? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;
+  }
+
+  public static void setAsyncProcessThreadPool(ExecutorService processPool, 
boolean shutdownCurrent) {
+    if (processThreadPool != null && shutdownCurrent) {
+      processThreadPool.shutdownNow();
+    }
+    processThreadPool = processPool;
+  }
+
+  public static void shutdownAsyncIOThreadPool() {
+    ioThreadPool.shutdownNow();
+  }
+
+  public static void shutdownAsyncProcessThreadPool() {
+    processThreadPool.shutdownNow();

Review Comment:
   Same as above



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;
+  }
+
+  public static void setAsyncProcessThreadPool(ExecutorService processPool, 
boolean shutdownCurrent) {
+    if (processThreadPool != null && shutdownCurrent) {
+      processThreadPool.shutdownNow();
+    }
+    processThreadPool = processPool;
+  }
+
+  public static void shutdownAsyncIOThreadPool() {
+    ioThreadPool.shutdownNow();

Review Comment:
   Should we set 'ioThreadPool = null'?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore 
internalReadFilteredRowGroup(BlockMetaData bloc
         }
       }
     }
+    String mode;
     // actually read all the chunks
-    for (ConsecutivePartList consecutiveChunks : allParts) {
-      consecutiveChunks.readAll(f, builder);
+    if (isAsyncIOReaderEnabled()) {
+        mode = "ASYNC";

Review Comment:
   I know it is only a short string but it seems only being used in debug log. 
Check if debug log enabled first and then  add it. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore 
internalReadFilteredRowGroup(BlockMetaData bloc
         }
       }
     }
+    String mode;
     // actually read all the chunks
-    for (ConsecutivePartList consecutiveChunks : allParts) {
-      consecutiveChunks.readAll(f, builder);
+    if (isAsyncIOReaderEnabled()) {
+        mode = "ASYNC";
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        SeekableInputStream is = file.newStream();
+        LOG.debug("{}: READING Consecutive chunk: {}", mode, 
consecutiveChunks);
+        consecutiveChunks.readAll(is, builder);
+        inputStreamList.add(is);
+      }
+    } else {
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        mode = "SYNC";

Review Comment:
   Same as above



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();
       }
+      if (this.currentRowGroup != null) {
+        this.currentRowGroup.close();

Review Comment:
   You want to set it to null? 
   
   this.currentRowGroup = null



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();

Review Comment:
   if f.close() throw exception, we don't close this. currentRowGroup() and 
streams in inputStreamList? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+
+
+    public DictionaryPage getDictionaryPage(){
+      return this.dictionaryPage;
+    }
+
+    public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+      return this.pagesInChunk;
+    }
+
+    void readAllRemainingPagesAsync() {
+      readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+    }
+
+    void readAllRemainingPages() throws IOException {
+      while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+        readOnePage();
+      }
+      if (chunk.offsetIndex == null && valuesCountReadSoFar != 
chunk.descriptor.metadata.getValueCount()) {
+        // Would be nice to have a CorruptParquetFileException or something as 
a subclass?
+        throw new IOException(
+            "Expected " + chunk.descriptor.metadata.getValueCount() + " values 
in column chunk at " +
+            getPath() + " offset " + 
chunk.descriptor.metadata.getFirstDataPageOffset() +
+            " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+            + " pages ending at file offset " + (chunk.descriptor.fileOffset + 
chunk.stream.position()));
+      }
+      try {
+        pagesInChunk.put(Optional.empty()); // add a marker for end of data
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted while reading page data", e);
+      }
+    }
+
+    void readOnePage() throws IOException {
+      long startRead = System.nanoTime();
+      try {
+        byte[] pageHeaderAAD = dataPageHeaderAAD;
+        if (null != headerBlockDecryptor) {
+          // Important: this verifies file integrity (makes sure dictionary 
page had not been removed)
+          if (null == dictionaryPage && 
chunk.descriptor.metadata.hasDictionaryPage()) {
+            pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+              rowGroupOrdinal, columnOrdinal, -1);
+          } else {
+            int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+          }
+        }
+        PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
+        final BytesInput pageBytes;
+        switch (pageHeader.type) {
+          case DICTIONARY_PAGE:
+            // there is only one dictionary page per column chunk
+            if (dictionaryPage != null) {
+              throw new ParquetDecodingException(
+                "more than one dictionary page in column " + 
chunk.descriptor.col);
+            }
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify dictionary page integrity, CRC checksum 
verification failed");
+            }
+            DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+            DictionaryPage compressedDictionaryPage =
+              new DictionaryPage(
+                pageBytes,
+                uncompressedPageSize,
+                dicHeader.getNum_values(),
+                converter.getEncoding(dicHeader.getEncoding())
+              );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              compressedDictionaryPage.setCrc(pageHeader.getCrc());
+            }
+            dictionaryPage = compressedDictionaryPage;
+            break;
+          case DATA_PAGE:
+            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (null != pageBlockDecryptor) {
+              AesCipher.quickUpdatePageAAD(dataPageAAD, 
chunk.getPageOrdinal(pageIndex));
+            }
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify page integrity, CRC checksum verification 
failed");
+            }
+            DataPageV1 dataPageV1 = new DataPageV1(
+              pageBytes,
+              dataHeaderV1.getNum_values(),
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV1.getStatistics(),
+                type),
+              
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+              
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+              converter.getEncoding(dataHeaderV1.getEncoding()));
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              dataPageV1.setCrc(pageHeader.getCrc());
+            }
+            writePage(dataPageV1);
+            valuesCountReadSoFar += dataHeaderV1.getNum_values();
+            ++dataPageCountReadSoFar;
+            pageIndex++;
+            break;
+          case DATA_PAGE_V2:
+            DataPageHeaderV2 dataHeaderV2 = 
pageHeader.getData_page_header_v2();
+            int dataSize = compressedPageSize - 
dataHeaderV2.getRepetition_levels_byte_length()
+              - dataHeaderV2.getDefinition_levels_byte_length();
+            if (null != pageBlockDecryptor) {
+              AesCipher.quickUpdatePageAAD(dataPageAAD, 
chunk.getPageOrdinal(pageIndex));
+            }
+            DataPage dataPageV2 = new DataPageV2(
+              dataHeaderV2.getNum_rows(),
+              dataHeaderV2.getNum_nulls(),
+              dataHeaderV2.getNum_values(),
+              
chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+              
chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+              converter.getEncoding(dataHeaderV2.getEncoding()),
+              chunk.readAsBytesInput(dataSize),
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV2.getStatistics(),
+                type),
+              dataHeaderV2.isIs_compressed()
+            );
+            writePage(dataPageV2);
+            valuesCountReadSoFar += dataHeaderV2.getNum_values();
+            ++dataPageCountReadSoFar;
+            pageIndex++;
+            break;
+          default:
+            LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(),
+              compressedPageSize);
+            chunk.stream.skipFully(compressedPageSize);
+            break;
+        }
+      } catch (Exception e) {
+        LOG.info("Exception while reading one more page for: [{} ]: {} ", 
Thread.currentThread().getName(), this);
+        throw e;
+      }
+      finally {
+        long timeSpentInRead = System.nanoTime() - startRead;
+        totalCountReadOnePage.add(1);
+        totalTimeReadOnePage.add(timeSpentInRead);
+        maxTimeReadOnePage.accumulate(timeSpentInRead);
+      }
+    }
+
+    private void writePage(DataPage page){

Review Comment:
   Let's rename it writerPageToChunk



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException {
     }
 
     protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, 
byte[] pageHeaderAAD) throws IOException {
+      String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC";

Review Comment:
   Check if debug log enabled 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+

Review Comment:
   remove one empty line 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1417,11 +1530,21 @@ List<Chunk> build() {
       for (Entry<ChunkDescriptor, ChunkData> entry : entries) {
         ChunkDescriptor descriptor = entry.getKey();
         ChunkData data = entry.getValue();
+        ByteBufferInputStream byteBufferInputStream;
+        if (isAsyncIOReaderEnabled()) {
+          // For async reads, we use a SequenceByeTeBufferInputStream instead 
of a ByteBufferInputStream
+          // because calling sliceBuffers in the latter blocks until all the 
buffers are read (effectively
+          // nullifying the async read) while the former blocks only if the 
next buffer is unavailable.
+          byteBufferInputStream = new 
SequenceByteBufferInputStream(data.streams);
+        } else {
+          byteBufferInputStream = ByteBufferInputStream.wrap(data.buffers);
+        }
         if (descriptor.equals(lastDescriptor)) {
           // because of a bug, the last chunk might be larger than 
descriptor.size
-          chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, 
data.offsetIndex, rowCount));
+          chunks.add(new WorkaroundChunk(lastDescriptor, 
byteBufferInputStream, f, data.offsetIndex,

Review Comment:
   So if isAsyncIOReaderEnabled is not true, we still byteBufferInputStream()? 
In line 1497, you comment that only asynchronous set the stream 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state

Review Comment:
   Like the idea to categorize variables! 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+
+
+    public DictionaryPage getDictionaryPage(){
+      return this.dictionaryPage;
+    }
+
+    public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+      return this.pagesInChunk;
+    }
+
+    void readAllRemainingPagesAsync() {
+      readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+    }
+
+    void readAllRemainingPages() throws IOException {
+      while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+        readOnePage();
+      }
+      if (chunk.offsetIndex == null && valuesCountReadSoFar != 
chunk.descriptor.metadata.getValueCount()) {
+        // Would be nice to have a CorruptParquetFileException or something as 
a subclass?
+        throw new IOException(
+            "Expected " + chunk.descriptor.metadata.getValueCount() + " values 
in column chunk at " +
+            getPath() + " offset " + 
chunk.descriptor.metadata.getFirstDataPageOffset() +
+            " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+            + " pages ending at file offset " + (chunk.descriptor.fileOffset + 
chunk.stream.position()));
+      }
+      try {
+        pagesInChunk.put(Optional.empty()); // add a marker for end of data
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted while reading page data", e);
+      }
+    }
+
+    void readOnePage() throws IOException {
+      long startRead = System.nanoTime();
+      try {
+        byte[] pageHeaderAAD = dataPageHeaderAAD;
+        if (null != headerBlockDecryptor) {
+          // Important: this verifies file integrity (makes sure dictionary 
page had not been removed)
+          if (null == dictionaryPage && 
chunk.descriptor.metadata.hasDictionaryPage()) {
+            pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+              rowGroupOrdinal, columnOrdinal, -1);
+          } else {
+            int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+          }
+        }
+        PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
+        final BytesInput pageBytes;
+        switch (pageHeader.type) {
+          case DICTIONARY_PAGE:
+            // there is only one dictionary page per column chunk
+            if (dictionaryPage != null) {
+              throw new ParquetDecodingException(
+                "more than one dictionary page in column " + 
chunk.descriptor.col);
+            }
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify dictionary page integrity, CRC checksum 
verification failed");
+            }
+            DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+            DictionaryPage compressedDictionaryPage =
+              new DictionaryPage(
+                pageBytes,
+                uncompressedPageSize,
+                dicHeader.getNum_values(),
+                converter.getEncoding(dicHeader.getEncoding())
+              );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              compressedDictionaryPage.setCrc(pageHeader.getCrc());
+            }
+            dictionaryPage = compressedDictionaryPage;

Review Comment:
   What is the reason a temp variable 'compressedDictionaryPage' is used? I 
guess the compiler will remove it anyway though. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   @ggershinsky Can you review all the decryption-related changes? 





> Implement async IO for Parquet file reader
> ------------------------------------------
>
>                 Key: PARQUET-2149
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2149
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Parth Chandra
>            Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to