[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556109#comment-17556109 ]
ASF GitHub Bot commented on PARQUET-2149: ----------------------------------------- shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -46,12 +46,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; Review Comment: I guess it is IDE does that but let's not use wildcard here ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing Review Comment: I understand we want applications to provide their own implementations, but can you share why we choose the cached thread pool instead of fixed in default? I kind of feel a lot of user scenarios of Parquet is with unpredictable execution times and we need better control over our program's resource consumption. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1387,8 +1489,13 @@ public void close() throws IOException { * result of the column-index based filtering when some pages might be skipped at reading. */ private class ChunkListBuilder { + // ChunkData is backed by either a list of buffers or a list of strams Review Comment: typo? streams? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, Review Comment: The 'dictionaryPageAAD' is only used in toString()? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -659,6 +696,8 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) } protected final SeekableInputStream f; + // input streams opened in the async mode + protected final List<SeekableInputStream> inputStreamList = new ArrayList<>(); Review Comment: should we call it inputStream? or just streams? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1377,7 +1472,14 @@ public void close() throws IOException { if (f != null) { f.close(); } + if (this.currentRowGroup != null) { + this.currentRowGroup.close(); + } + for (SeekableInputStream is : inputStreamList) { + is.close(); + } } finally { + //TODO: make sure that any outstanding io tasks submitted by this reader have been cancelled. Review Comment: let's not add tech debt. Let's add verification in this PR. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } + private boolean isAsyncIOReaderEnabled(){ + if (options.isAsyncIOReaderEnabled() ) { + if (ioThreadPool != null) { + return true; + } else { + LOG.warn("Parquet async IO is configured but the IO thread pool has not been " + + "initialized. Configuration is being ignored"); + } + } + return false; + } + + private boolean isParallelColumnReaderEnabled(){ + if (options.isParallelColumnReaderEnabled() ) { + if (processThreadPool != null) { + return true; + } else { + LOG.warn("Parallel column reading is configured but the process thread pool has " + Review Comment: Same comment as above ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-process")); + + public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean shutdownCurrent) { + if (ioThreadPool != null && shutdownCurrent) { + ioThreadPool.shutdownNow(); + } + ioThreadPool = ioPool; Review Comment: Can we check if 'ioPool' is null? In cases of null, is it valid? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } + private boolean isAsyncIOReaderEnabled(){ + if (options.isAsyncIOReaderEnabled() ) { + if (ioThreadPool != null) { + return true; + } else { + LOG.warn("Parquet async IO is configured but the IO thread pool has not been " + Review Comment: isAsyncIOReaderEnabled() is called at page level if I am correct. In a large-scale system, this log could generate too much data. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -937,6 +1003,10 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE ColumnPath pathKey = mc.getPath(); ColumnDescriptor columnDescriptor = paths.get(pathKey); if (columnDescriptor != null) { + // If async, we need a new stream for every column Review Comment: Make the comments more specfic "If async IO or parallel reader feature is enabled, we need a new stream for every column" ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); Review Comment: Where we use 'numProcessors'? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-process")); + + public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean shutdownCurrent) { + if (ioThreadPool != null && shutdownCurrent) { + ioThreadPool.shutdownNow(); + } + ioThreadPool = ioPool; + } + + public static void setAsyncProcessThreadPool(ExecutorService processPool, boolean shutdownCurrent) { + if (processThreadPool != null && shutdownCurrent) { + processThreadPool.shutdownNow(); + } + processThreadPool = processPool; + } + + public static void shutdownAsyncIOThreadPool() { + ioThreadPool.shutdownNow(); + } + + public static void shutdownAsyncProcessThreadPool() { + processThreadPool.shutdownNow(); Review Comment: Same as above ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( + r -> new Thread(r, "parquet-process")); + + public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean shutdownCurrent) { + if (ioThreadPool != null && shutdownCurrent) { + ioThreadPool.shutdownNow(); + } + ioThreadPool = ioPool; + } + + public static void setAsyncProcessThreadPool(ExecutorService processPool, boolean shutdownCurrent) { + if (processThreadPool != null && shutdownCurrent) { + processThreadPool.shutdownNow(); + } + processThreadPool = processPool; + } + + public static void shutdownAsyncIOThreadPool() { + ioThreadPool.shutdownNow(); Review Comment: Should we set 'ioThreadPool = null'? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc } } } + String mode; // actually read all the chunks - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); + if (isAsyncIOReaderEnabled()) { + mode = "ASYNC"; Review Comment: I know it is only a short string but it seems only being used in debug log. Check if debug log enabled first and then add it. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc } } } + String mode; // actually read all the chunks - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); + if (isAsyncIOReaderEnabled()) { + mode = "ASYNC"; + for (ConsecutivePartList consecutiveChunks : allParts) { + SeekableInputStream is = file.newStream(); + LOG.debug("{}: READING Consecutive chunk: {}", mode, consecutiveChunks); + consecutiveChunks.readAll(is, builder); + inputStreamList.add(is); + } + } else { + for (ConsecutivePartList consecutiveChunks : allParts) { + mode = "SYNC"; Review Comment: Same as above ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1377,7 +1472,14 @@ public void close() throws IOException { if (f != null) { f.close(); } + if (this.currentRowGroup != null) { + this.currentRowGroup.close(); Review Comment: You want to set it to null? this.currentRowGroup = null ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1377,7 +1472,14 @@ public void close() throws IOException { if (f != null) { f.close(); Review Comment: if f.close() throw exception, we don't close this. currentRowGroup() and streams in inputStreamList? ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, + rowGroupOrdinal, columnOrdinal, -1); + } else { + dataPageAAD = null; + dictionaryPageAAD = null; + } + } + + + public DictionaryPage getDictionaryPage(){ + return this.dictionaryPage; + } + + public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){ + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(processThreadPool.submit(new PageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null && valuesCountReadSoFar != chunk.descriptor.metadata.getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.descriptor.metadata.getValueCount() + " values in column chunk at " + + getPath() + " offset " + chunk.descriptor.metadata.getFirstDataPageOffset() + + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + (chunk.descriptor.fileOffset + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage && chunk.descriptor.metadata.hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + chunk.descriptor.col); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding()) + ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (null != pageBlockDecryptor) { + AesCipher.quickUpdatePageAAD(dataPageAAD, chunk.getPageOrdinal(pageIndex)); + } + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify page integrity, CRC checksum verification failed"); + } + DataPageV1 dataPageV1 = new DataPageV1( + pageBytes, + dataHeaderV1.getNum_values(), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), + dataHeaderV1.getStatistics(), + type), + converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), + converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), + converter.getEncoding(dataHeaderV1.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dataPageV1.setCrc(pageHeader.getCrc()); + } + writePage(dataPageV1); + valuesCountReadSoFar += dataHeaderV1.getNum_values(); + ++dataPageCountReadSoFar; + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); + int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() + - dataHeaderV2.getDefinition_levels_byte_length(); + if (null != pageBlockDecryptor) { + AesCipher.quickUpdatePageAAD(dataPageAAD, chunk.getPageOrdinal(pageIndex)); + } + DataPage dataPageV2 = new DataPageV2( + dataHeaderV2.getNum_rows(), + dataHeaderV2.getNum_nulls(), + dataHeaderV2.getNum_values(), + chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()), + chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()), + converter.getEncoding(dataHeaderV2.getEncoding()), + chunk.readAsBytesInput(dataSize), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), + dataHeaderV2.getStatistics(), + type), + dataHeaderV2.isIs_compressed() + ); + writePage(dataPageV2); + valuesCountReadSoFar += dataHeaderV2.getNum_values(); + ++dataPageCountReadSoFar; + pageIndex++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), + compressedPageSize); + chunk.stream.skipFully(compressedPageSize); + break; + } + } catch (Exception e) { + LOG.info("Exception while reading one more page for: [{} ]: {} ", Thread.currentThread().getName(), this); + throw e; + } + finally { + long timeSpentInRead = System.nanoTime() - startRead; + totalCountReadOnePage.add(1); + totalTimeReadOnePage.add(timeSpentInRead); + maxTimeReadOnePage.accumulate(timeSpentInRead); + } + } + + private void writePage(DataPage page){ Review Comment: Let's rename it writerPageToChunk ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException { } protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException { + String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC"; Review Comment: Check if debug log enabled ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, + rowGroupOrdinal, columnOrdinal, -1); + } else { + dataPageAAD = null; + dictionaryPageAAD = null; + } + } + Review Comment: remove one empty line ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1417,11 +1530,21 @@ List<Chunk> build() { for (Entry<ChunkDescriptor, ChunkData> entry : entries) { ChunkDescriptor descriptor = entry.getKey(); ChunkData data = entry.getValue(); + ByteBufferInputStream byteBufferInputStream; + if (isAsyncIOReaderEnabled()) { + // For async reads, we use a SequenceByeTeBufferInputStream instead of a ByteBufferInputStream + // because calling sliceBuffers in the latter blocks until all the buffers are read (effectively + // nullifying the async read) while the former blocks only if the next buffer is unavailable. + byteBufferInputStream = new SequenceByteBufferInputStream(data.streams); + } else { + byteBufferInputStream = ByteBufferInputStream.wrap(data.buffers); + } if (descriptor.equals(lastDescriptor)) { // because of a bug, the last chunk might be larger than descriptor.size - chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount)); + chunks.add(new WorkaroundChunk(lastDescriptor, byteBufferInputStream, f, data.offsetIndex, Review Comment: So if isAsyncIOReaderEnabled is not true, we still byteBufferInputStream()? In line 1497, you comment that only asynchronous set the stream ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state Review Comment: Like the idea to categorize variables! ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, + rowGroupOrdinal, columnOrdinal, -1); + } else { + dataPageAAD = null; + dictionaryPageAAD = null; + } + } + + + public DictionaryPage getDictionaryPage(){ + return this.dictionaryPage; + } + + public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){ + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(processThreadPool.submit(new PageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null && valuesCountReadSoFar != chunk.descriptor.metadata.getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.descriptor.metadata.getValueCount() + " values in column chunk at " + + getPath() + " offset " + chunk.descriptor.metadata.getFirstDataPageOffset() + + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + (chunk.descriptor.fileOffset + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage && chunk.descriptor.metadata.hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + chunk.descriptor.col); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding()) + ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; Review Comment: What is the reason a temp variable 'compressedDictionaryPage' is used? I guess the compiler will remove it anyway though. ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ########## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + + @Override + public String toString() { + return "ConsecutivePartList{" + + "offset=" + offset + + ", length=" + length + + ", chunks=" + chunks + + '}'; + } } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private final byte[] dictionaryPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() + .getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); + dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, Review Comment: @ggershinsky Can you review all the decryption-related changes? > Implement async IO for Parquet file reader > ------------------------------------------ > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr > Reporter: Parth Chandra > Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)