shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -46,12 +46,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;

Review Comment:
   I guess it is IDE does that but let's not use wildcard here 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing

Review Comment:
   I understand we want applications to provide their own implementations, but 
can you share why we choose the cached thread pool instead of fixed in default? 
I kind of feel a lot of user scenarios of Parquet is with unpredictable 
execution times and we need better control over our program's resource 
consumption. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1387,8 +1489,13 @@ public void close() throws IOException {
    * result of the column-index based filtering when some pages might be 
skipped at reading.
    */
   private class ChunkListBuilder {
+    // ChunkData is backed by either a list of buffers or a list of strams

Review Comment:
   typo? streams? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   The 'dictionaryPageAAD' is only used in toString()? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -659,6 +696,8 @@ public static ParquetFileReader open(InputFile file, 
ParquetReadOptions options)
   }
 
   protected final SeekableInputStream f;
+  // input streams opened in the async mode
+  protected final List<SeekableInputStream> inputStreamList = new 
ArrayList<>();

Review Comment:
   should we call it inputStream? or just streams? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();
       }
+      if (this.currentRowGroup != null) {
+        this.currentRowGroup.close();
+      }
+      for (SeekableInputStream is : inputStreamList) {
+        is.close();
+      }
     } finally {
+      //TODO: make sure that any outstanding io tasks submitted by this reader 
have been cancelled.

Review Comment:
   let's not add tech debt. Let's add verification in this PR. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, 
ParquetReadOptions options) throws IOEx
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  private boolean isAsyncIOReaderEnabled(){
+    if (options.isAsyncIOReaderEnabled() ) {
+      if (ioThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parquet async IO is configured but the IO thread pool has 
not been " +
+          "initialized. Configuration is being ignored");
+      }
+    }
+    return false;
+  }
+
+  private boolean isParallelColumnReaderEnabled(){
+    if (options.isParallelColumnReaderEnabled() ) {
+      if (processThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parallel column reading is configured but the process thread 
pool has " +

Review Comment:
   Same comment as above 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;

Review Comment:
   Can we check if 'ioPool' is null? In cases of null, is it valid? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, 
ParquetReadOptions options) throws IOEx
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  private boolean isAsyncIOReaderEnabled(){
+    if (options.isAsyncIOReaderEnabled() ) {
+      if (ioThreadPool != null) {
+        return true;
+      } else {
+        LOG.warn("Parquet async IO is configured but the IO thread pool has 
not been " +

Review Comment:
   isAsyncIOReaderEnabled() is called at page level if I am correct. In a 
large-scale system, this log could generate too much data.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -937,6 +1003,10 @@ private ColumnChunkPageReadStore internalReadRowGroup(int 
blockIndex) throws IOE
       ColumnPath pathKey = mc.getPath();
       ColumnDescriptor columnDescriptor = paths.get(pathKey);
       if (columnDescriptor != null) {
+        // If async, we need a new stream for every column

Review Comment:
   Make the comments more specfic "If async IO or parallel reader feature is 
enabled, we need a new stream for every column"



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();

Review Comment:
   Where we use 'numProcessors'? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;
+  }
+
+  public static void setAsyncProcessThreadPool(ExecutorService processPool, 
boolean shutdownCurrent) {
+    if (processThreadPool != null && shutdownCurrent) {
+      processThreadPool.shutdownNow();
+    }
+    processThreadPool = processPool;
+  }
+
+  public static void shutdownAsyncIOThreadPool() {
+    ioThreadPool.shutdownNow();
+  }
+
+  public static void shutdownAsyncProcessThreadPool() {
+    processThreadPool.shutdownNow();

Review Comment:
   Same as above



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(
+    r -> new Thread(r, "parquet-process"));
+
+  public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean 
shutdownCurrent) {
+    if (ioThreadPool != null && shutdownCurrent) {
+      ioThreadPool.shutdownNow();
+    }
+    ioThreadPool = ioPool;
+  }
+
+  public static void setAsyncProcessThreadPool(ExecutorService processPool, 
boolean shutdownCurrent) {
+    if (processThreadPool != null && shutdownCurrent) {
+      processThreadPool.shutdownNow();
+    }
+    processThreadPool = processPool;
+  }
+
+  public static void shutdownAsyncIOThreadPool() {
+    ioThreadPool.shutdownNow();

Review Comment:
   Should we set 'ioThreadPool = null'?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore 
internalReadFilteredRowGroup(BlockMetaData bloc
         }
       }
     }
+    String mode;
     // actually read all the chunks
-    for (ConsecutivePartList consecutiveChunks : allParts) {
-      consecutiveChunks.readAll(f, builder);
+    if (isAsyncIOReaderEnabled()) {
+        mode = "ASYNC";

Review Comment:
   I know it is only a short string but it seems only being used in debug log. 
Check if debug log enabled first and then  add it. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore 
internalReadFilteredRowGroup(BlockMetaData bloc
         }
       }
     }
+    String mode;
     // actually read all the chunks
-    for (ConsecutivePartList consecutiveChunks : allParts) {
-      consecutiveChunks.readAll(f, builder);
+    if (isAsyncIOReaderEnabled()) {
+        mode = "ASYNC";
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        SeekableInputStream is = file.newStream();
+        LOG.debug("{}: READING Consecutive chunk: {}", mode, 
consecutiveChunks);
+        consecutiveChunks.readAll(is, builder);
+        inputStreamList.add(is);
+      }
+    } else {
+      for (ConsecutivePartList consecutiveChunks : allParts) {
+        mode = "SYNC";

Review Comment:
   Same as above



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();
       }
+      if (this.currentRowGroup != null) {
+        this.currentRowGroup.close();

Review Comment:
   You want to set it to null? 
   
   this.currentRowGroup = null



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
       if (f != null) {
         f.close();

Review Comment:
   if f.close() throw exception, we don't close this. currentRowGroup() and 
streams in inputStreamList? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+
+
+    public DictionaryPage getDictionaryPage(){
+      return this.dictionaryPage;
+    }
+
+    public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+      return this.pagesInChunk;
+    }
+
+    void readAllRemainingPagesAsync() {
+      readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+    }
+
+    void readAllRemainingPages() throws IOException {
+      while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+        readOnePage();
+      }
+      if (chunk.offsetIndex == null && valuesCountReadSoFar != 
chunk.descriptor.metadata.getValueCount()) {
+        // Would be nice to have a CorruptParquetFileException or something as 
a subclass?
+        throw new IOException(
+            "Expected " + chunk.descriptor.metadata.getValueCount() + " values 
in column chunk at " +
+            getPath() + " offset " + 
chunk.descriptor.metadata.getFirstDataPageOffset() +
+            " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+            + " pages ending at file offset " + (chunk.descriptor.fileOffset + 
chunk.stream.position()));
+      }
+      try {
+        pagesInChunk.put(Optional.empty()); // add a marker for end of data
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted while reading page data", e);
+      }
+    }
+
+    void readOnePage() throws IOException {
+      long startRead = System.nanoTime();
+      try {
+        byte[] pageHeaderAAD = dataPageHeaderAAD;
+        if (null != headerBlockDecryptor) {
+          // Important: this verifies file integrity (makes sure dictionary 
page had not been removed)
+          if (null == dictionaryPage && 
chunk.descriptor.metadata.hasDictionaryPage()) {
+            pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+              rowGroupOrdinal, columnOrdinal, -1);
+          } else {
+            int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+          }
+        }
+        PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
+        final BytesInput pageBytes;
+        switch (pageHeader.type) {
+          case DICTIONARY_PAGE:
+            // there is only one dictionary page per column chunk
+            if (dictionaryPage != null) {
+              throw new ParquetDecodingException(
+                "more than one dictionary page in column " + 
chunk.descriptor.col);
+            }
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify dictionary page integrity, CRC checksum 
verification failed");
+            }
+            DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+            DictionaryPage compressedDictionaryPage =
+              new DictionaryPage(
+                pageBytes,
+                uncompressedPageSize,
+                dicHeader.getNum_values(),
+                converter.getEncoding(dicHeader.getEncoding())
+              );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              compressedDictionaryPage.setCrc(pageHeader.getCrc());
+            }
+            dictionaryPage = compressedDictionaryPage;
+            break;
+          case DATA_PAGE:
+            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (null != pageBlockDecryptor) {
+              AesCipher.quickUpdatePageAAD(dataPageAAD, 
chunk.getPageOrdinal(pageIndex));
+            }
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify page integrity, CRC checksum verification 
failed");
+            }
+            DataPageV1 dataPageV1 = new DataPageV1(
+              pageBytes,
+              dataHeaderV1.getNum_values(),
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV1.getStatistics(),
+                type),
+              
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+              
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+              converter.getEncoding(dataHeaderV1.getEncoding()));
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              dataPageV1.setCrc(pageHeader.getCrc());
+            }
+            writePage(dataPageV1);
+            valuesCountReadSoFar += dataHeaderV1.getNum_values();
+            ++dataPageCountReadSoFar;
+            pageIndex++;
+            break;
+          case DATA_PAGE_V2:
+            DataPageHeaderV2 dataHeaderV2 = 
pageHeader.getData_page_header_v2();
+            int dataSize = compressedPageSize - 
dataHeaderV2.getRepetition_levels_byte_length()
+              - dataHeaderV2.getDefinition_levels_byte_length();
+            if (null != pageBlockDecryptor) {
+              AesCipher.quickUpdatePageAAD(dataPageAAD, 
chunk.getPageOrdinal(pageIndex));
+            }
+            DataPage dataPageV2 = new DataPageV2(
+              dataHeaderV2.getNum_rows(),
+              dataHeaderV2.getNum_nulls(),
+              dataHeaderV2.getNum_values(),
+              
chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+              
chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+              converter.getEncoding(dataHeaderV2.getEncoding()),
+              chunk.readAsBytesInput(dataSize),
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV2.getStatistics(),
+                type),
+              dataHeaderV2.isIs_compressed()
+            );
+            writePage(dataPageV2);
+            valuesCountReadSoFar += dataHeaderV2.getNum_values();
+            ++dataPageCountReadSoFar;
+            pageIndex++;
+            break;
+          default:
+            LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(),
+              compressedPageSize);
+            chunk.stream.skipFully(compressedPageSize);
+            break;
+        }
+      } catch (Exception e) {
+        LOG.info("Exception while reading one more page for: [{} ]: {} ", 
Thread.currentThread().getName(), this);
+        throw e;
+      }
+      finally {
+        long timeSpentInRead = System.nanoTime() - startRead;
+        totalCountReadOnePage.add(1);
+        totalTimeReadOnePage.add(timeSpentInRead);
+        maxTimeReadOnePage.accumulate(timeSpentInRead);
+      }
+    }
+
+    private void writePage(DataPage page){

Review Comment:
   Let's rename it writerPageToChunk



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException {
     }
 
     protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, 
byte[] pageHeaderAAD) throws IOException {
+      String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC";

Review Comment:
   Check if debug log enabled 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+

Review Comment:
   remove one empty line 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1417,11 +1530,21 @@ List<Chunk> build() {
       for (Entry<ChunkDescriptor, ChunkData> entry : entries) {
         ChunkDescriptor descriptor = entry.getKey();
         ChunkData data = entry.getValue();
+        ByteBufferInputStream byteBufferInputStream;
+        if (isAsyncIOReaderEnabled()) {
+          // For async reads, we use a SequenceByeTeBufferInputStream instead 
of a ByteBufferInputStream
+          // because calling sliceBuffers in the latter blocks until all the 
buffers are read (effectively
+          // nullifying the async read) while the former blocks only if the 
next buffer is unavailable.
+          byteBufferInputStream = new 
SequenceByteBufferInputStream(data.streams);
+        } else {
+          byteBufferInputStream = ByteBufferInputStream.wrap(data.buffers);
+        }
         if (descriptor.equals(lastDescriptor)) {
           // because of a bug, the last chunk might be larger than 
descriptor.size
-          chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, 
data.offsetIndex, rowCount));
+          chunks.add(new WorkaroundChunk(lastDescriptor, 
byteBufferInputStream, f, data.offsetIndex,

Review Comment:
   So if isAsyncIOReaderEnabled is not true, we still byteBufferInputStream()? 
In line 1497, you comment that only asynchronous set the stream 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state

Review Comment:
   Like the idea to categorize variables! 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,
+          rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+
+
+    public DictionaryPage getDictionaryPage(){
+      return this.dictionaryPage;
+    }
+
+    public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+      return this.pagesInChunk;
+    }
+
+    void readAllRemainingPagesAsync() {
+      readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+    }
+
+    void readAllRemainingPages() throws IOException {
+      while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+        readOnePage();
+      }
+      if (chunk.offsetIndex == null && valuesCountReadSoFar != 
chunk.descriptor.metadata.getValueCount()) {
+        // Would be nice to have a CorruptParquetFileException or something as 
a subclass?
+        throw new IOException(
+            "Expected " + chunk.descriptor.metadata.getValueCount() + " values 
in column chunk at " +
+            getPath() + " offset " + 
chunk.descriptor.metadata.getFirstDataPageOffset() +
+            " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
+            + " pages ending at file offset " + (chunk.descriptor.fileOffset + 
chunk.stream.position()));
+      }
+      try {
+        pagesInChunk.put(Optional.empty()); // add a marker for end of data
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted while reading page data", e);
+      }
+    }
+
+    void readOnePage() throws IOException {
+      long startRead = System.nanoTime();
+      try {
+        byte[] pageHeaderAAD = dataPageHeaderAAD;
+        if (null != headerBlockDecryptor) {
+          // Important: this verifies file integrity (makes sure dictionary 
page had not been removed)
+          if (null == dictionaryPage && 
chunk.descriptor.metadata.hasDictionaryPage()) {
+            pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPageHeader,
+              rowGroupOrdinal, columnOrdinal, -1);
+          } else {
+            int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+          }
+        }
+        PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, 
pageHeaderAAD);
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
+        final BytesInput pageBytes;
+        switch (pageHeader.type) {
+          case DICTIONARY_PAGE:
+            // there is only one dictionary page per column chunk
+            if (dictionaryPage != null) {
+              throw new ParquetDecodingException(
+                "more than one dictionary page in column " + 
chunk.descriptor.col);
+            }
+            pageBytes = chunk.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify dictionary page integrity, CRC checksum 
verification failed");
+            }
+            DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
+            DictionaryPage compressedDictionaryPage =
+              new DictionaryPage(
+                pageBytes,
+                uncompressedPageSize,
+                dicHeader.getNum_values(),
+                converter.getEncoding(dicHeader.getEncoding())
+              );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              compressedDictionaryPage.setCrc(pageHeader.getCrc());
+            }
+            dictionaryPage = compressedDictionaryPage;

Review Comment:
   What is the reason a temp variable 'compressedDictionaryPage' is used? I 
guess the compiler will remove it anyway though. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
     public long endPos() {
       return offset + length;
     }
+
+    @Override
+    public String toString() {
+      return "ConsecutivePartList{" +
+        "offset=" + offset +
+        ", length=" + length +
+        ", chunks=" + chunks +
+        '}';
+    }
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+    private final Chunk chunk;
+    private final int currentBlock;
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor pageBlockDecryptor;
+    private final byte[] aadPrefix;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+
+    //state
+    private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new 
LinkedBlockingDeque<>();
+    private DictionaryPage dictionaryPage = null;
+    private int pageIndex = 0;
+    private long valuesCountReadSoFar = 0;
+    private int dataPageCountReadSoFar = 0;
+
+    // derived
+    private final PrimitiveType type;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
+    private byte[] dataPageHeaderAAD = null;
+
+    private final BytesInputDecompressor decompressor;
+
+    private final ConcurrentLinkedQueue<Future<Void>> readFutures = new 
ConcurrentLinkedQueue<>();
+
+    private final LongAdder totalTimeReadOnePage = new LongAdder();
+    private final LongAdder totalCountReadOnePage = new LongAdder();
+    private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+    private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+    private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+    private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+    public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+      Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+      BytesInputDecompressor decompressor
+      ) {
+      this.chunk = chunk;
+      this.currentBlock = currentBlock;
+      this.headerBlockDecryptor = headerBlockDecryptor;
+      this.pageBlockDecryptor = pageBlockDecryptor;
+      this.aadPrefix = aadPrefix;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.decompressor = decompressor;
+
+      this.type = getFileMetaData().getSchema()
+        .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+          rowGroupOrdinal,
+          columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+      }
+      if (null != pageBlockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+          columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   @ggershinsky Can you review all the decryption-related changes? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to