[
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556109#comment-17556109
]
ASF GitHub Bot commented on PARQUET-2149:
-----------------------------------------
shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -46,12 +46,11 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
Review Comment:
I guess it is IDE does that but let's not use wildcard here
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+ // Thread pool to read column chunk data from disk. Applications should call
setAsyncIOThreadPool
+ // to initialize this with their own implementations.
+ // Default initialization is useful only for testing
Review Comment:
I understand we want applications to provide their own implementations, but
can you share why we choose the cached thread pool instead of fixed in default?
I kind of feel a lot of user scenarios of Parquet is with unpredictable
execution times and we need better control over our program's resource
consumption.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1387,8 +1489,13 @@ public void close() throws IOException {
* result of the column-index based filtering when some pages might be
skipped at reading.
*/
private class ChunkListBuilder {
+ // ChunkData is backed by either a list of buffers or a list of strams
Review Comment:
typo? streams?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
Review Comment:
The 'dictionaryPageAAD' is only used in toString()?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -659,6 +696,8 @@ public static ParquetFileReader open(InputFile file,
ParquetReadOptions options)
}
protected final SeekableInputStream f;
+ // input streams opened in the async mode
+ protected final List<SeekableInputStream> inputStreamList = new
ArrayList<>();
Review Comment:
should we call it inputStream? or just streams?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
if (f != null) {
f.close();
}
+ if (this.currentRowGroup != null) {
+ this.currentRowGroup.close();
+ }
+ for (SeekableInputStream is : inputStreamList) {
+ is.close();
+ }
} finally {
+ //TODO: make sure that any outstanding io tasks submitted by this reader
have been cancelled.
Review Comment:
let's not add tech debt. Let's add verification in this PR.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file,
ParquetReadOptions options) throws IOEx
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
+ private boolean isAsyncIOReaderEnabled(){
+ if (options.isAsyncIOReaderEnabled() ) {
+ if (ioThreadPool != null) {
+ return true;
+ } else {
+ LOG.warn("Parquet async IO is configured but the IO thread pool has
not been " +
+ "initialized. Configuration is being ignored");
+ }
+ }
+ return false;
+ }
+
+ private boolean isParallelColumnReaderEnabled(){
+ if (options.isParallelColumnReaderEnabled() ) {
+ if (processThreadPool != null) {
+ return true;
+ } else {
+ LOG.warn("Parallel column reading is configured but the process thread
pool has " +
Review Comment:
Same comment as above
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+ // Thread pool to read column chunk data from disk. Applications should call
setAsyncIOThreadPool
+ // to initialize this with their own implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-io"));
+
+ // Thread pool to process pages for multiple columns in parallel.
Applications should call
+ // setAsyncProcessThreadPool to initialize this with their own
implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService processThreadPool =
Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-process"));
+
+ public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean
shutdownCurrent) {
+ if (ioThreadPool != null && shutdownCurrent) {
+ ioThreadPool.shutdownNow();
+ }
+ ioThreadPool = ioPool;
Review Comment:
Can we check if 'ioPool' is null? In cases of null, is it valid?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file,
ParquetReadOptions options) throws IOEx
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
+ private boolean isAsyncIOReaderEnabled(){
+ if (options.isAsyncIOReaderEnabled() ) {
+ if (ioThreadPool != null) {
+ return true;
+ } else {
+ LOG.warn("Parquet async IO is configured but the IO thread pool has
not been " +
Review Comment:
isAsyncIOReaderEnabled() is called at page level if I am correct. In a
large-scale system, this log could generate too much data.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -937,6 +1003,10 @@ private ColumnChunkPageReadStore internalReadRowGroup(int
blockIndex) throws IOE
ColumnPath pathKey = mc.getPath();
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
+ // If async, we need a new stream for every column
Review Comment:
Make the comments more specfic "If async IO or parallel reader feature is
enabled, we need a new stream for every column"
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
Review Comment:
Where we use 'numProcessors'?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+ // Thread pool to read column chunk data from disk. Applications should call
setAsyncIOThreadPool
+ // to initialize this with their own implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-io"));
+
+ // Thread pool to process pages for multiple columns in parallel.
Applications should call
+ // setAsyncProcessThreadPool to initialize this with their own
implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService processThreadPool =
Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-process"));
+
+ public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean
shutdownCurrent) {
+ if (ioThreadPool != null && shutdownCurrent) {
+ ioThreadPool.shutdownNow();
+ }
+ ioThreadPool = ioPool;
+ }
+
+ public static void setAsyncProcessThreadPool(ExecutorService processPool,
boolean shutdownCurrent) {
+ if (processThreadPool != null && shutdownCurrent) {
+ processThreadPool.shutdownNow();
+ }
+ processThreadPool = processPool;
+ }
+
+ public static void shutdownAsyncIOThreadPool() {
+ ioThreadPool.shutdownNow();
+ }
+
+ public static void shutdownAsyncProcessThreadPool() {
+ processThreadPool.shutdownNow();
Review Comment:
Same as above
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+ // Thread pool to read column chunk data from disk. Applications should call
setAsyncIOThreadPool
+ // to initialize this with their own implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-io"));
+
+ // Thread pool to process pages for multiple columns in parallel.
Applications should call
+ // setAsyncProcessThreadPool to initialize this with their own
implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService processThreadPool =
Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-process"));
+
+ public static void setAsyncIOThreadPool(ExecutorService ioPool, boolean
shutdownCurrent) {
+ if (ioThreadPool != null && shutdownCurrent) {
+ ioThreadPool.shutdownNow();
+ }
+ ioThreadPool = ioPool;
+ }
+
+ public static void setAsyncProcessThreadPool(ExecutorService processPool,
boolean shutdownCurrent) {
+ if (processThreadPool != null && shutdownCurrent) {
+ processThreadPool.shutdownNow();
+ }
+ processThreadPool = processPool;
+ }
+
+ public static void shutdownAsyncIOThreadPool() {
+ ioThreadPool.shutdownNow();
Review Comment:
Should we set 'ioThreadPool = null'?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore
internalReadFilteredRowGroup(BlockMetaData bloc
}
}
}
+ String mode;
// actually read all the chunks
- for (ConsecutivePartList consecutiveChunks : allParts) {
- consecutiveChunks.readAll(f, builder);
+ if (isAsyncIOReaderEnabled()) {
+ mode = "ASYNC";
Review Comment:
I know it is only a short string but it seems only being used in debug log.
Check if debug log enabled first and then add it.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1072,9 +1154,22 @@ private ColumnChunkPageReadStore
internalReadFilteredRowGroup(BlockMetaData bloc
}
}
}
+ String mode;
// actually read all the chunks
- for (ConsecutivePartList consecutiveChunks : allParts) {
- consecutiveChunks.readAll(f, builder);
+ if (isAsyncIOReaderEnabled()) {
+ mode = "ASYNC";
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ SeekableInputStream is = file.newStream();
+ LOG.debug("{}: READING Consecutive chunk: {}", mode,
consecutiveChunks);
+ consecutiveChunks.readAll(is, builder);
+ inputStreamList.add(is);
+ }
+ } else {
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ mode = "SYNC";
Review Comment:
Same as above
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
if (f != null) {
f.close();
}
+ if (this.currentRowGroup != null) {
+ this.currentRowGroup.close();
Review Comment:
You want to set it to null?
this.currentRowGroup = null
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1377,7 +1472,14 @@ public void close() throws IOException {
if (f != null) {
f.close();
Review Comment:
if f.close() throw exception, we don't close this. currentRowGroup() and
streams in inputStreamList?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ dataPageAAD = null;
+ dictionaryPageAAD = null;
+ }
+ }
+
+
+ public DictionaryPage getDictionaryPage(){
+ return this.dictionaryPage;
+ }
+
+ public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+ return this.pagesInChunk;
+ }
+
+ void readAllRemainingPagesAsync() {
+ readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+ }
+
+ void readAllRemainingPages() throws IOException {
+ while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+ readOnePage();
+ }
+ if (chunk.offsetIndex == null && valuesCountReadSoFar !=
chunk.descriptor.metadata.getValueCount()) {
+ // Would be nice to have a CorruptParquetFileException or something as
a subclass?
+ throw new IOException(
+ "Expected " + chunk.descriptor.metadata.getValueCount() + " values
in column chunk at " +
+ getPath() + " offset " +
chunk.descriptor.metadata.getFirstDataPageOffset() +
+ " but got " + valuesCountReadSoFar + " values instead over " +
pagesInChunk.size()
+ + " pages ending at file offset " + (chunk.descriptor.fileOffset +
chunk.stream.position()));
+ }
+ try {
+ pagesInChunk.put(Optional.empty()); // add a marker for end of data
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while reading page data", e);
+ }
+ }
+
+ void readOnePage() throws IOException {
+ long startRead = System.nanoTime();
+ try {
+ byte[] pageHeaderAAD = dataPageHeaderAAD;
+ if (null != headerBlockDecryptor) {
+ // Important: this verifies file integrity (makes sure dictionary
page had not been removed)
+ if (null == dictionaryPage &&
chunk.descriptor.metadata.hasDictionaryPage()) {
+ pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPageHeader,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
+ }
+ PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor,
pageHeaderAAD);
+ int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+ int compressedPageSize = pageHeader.getCompressed_page_size();
+ final BytesInput pageBytes;
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ // there is only one dictionary page per column chunk
+ if (dictionaryPage != null) {
+ throw new ParquetDecodingException(
+ "more than one dictionary page in column " +
chunk.descriptor.col);
+ }
+ pageBytes = chunk.readAsBytesInput(compressedPageSize);
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify dictionary page integrity, CRC checksum
verification failed");
+ }
+ DictionaryPageHeader dicHeader =
pageHeader.getDictionary_page_header();
+ DictionaryPage compressedDictionaryPage =
+ new DictionaryPage(
+ pageBytes,
+ uncompressedPageSize,
+ dicHeader.getNum_values(),
+ converter.getEncoding(dicHeader.getEncoding())
+ );
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ compressedDictionaryPage.setCrc(pageHeader.getCrc());
+ }
+ dictionaryPage = compressedDictionaryPage;
+ break;
+ case DATA_PAGE:
+ DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+ pageBytes = chunk.readAsBytesInput(compressedPageSize);
+ if (null != pageBlockDecryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD,
chunk.getPageOrdinal(pageIndex));
+ }
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify page integrity, CRC checksum verification
failed");
+ }
+ DataPageV1 dataPageV1 = new DataPageV1(
+ pageBytes,
+ dataHeaderV1.getNum_values(),
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV1.getStatistics(),
+ type),
+
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getEncoding()));
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dataPageV1.setCrc(pageHeader.getCrc());
+ }
+ writePage(dataPageV1);
+ valuesCountReadSoFar += dataHeaderV1.getNum_values();
+ ++dataPageCountReadSoFar;
+ pageIndex++;
+ break;
+ case DATA_PAGE_V2:
+ DataPageHeaderV2 dataHeaderV2 =
pageHeader.getData_page_header_v2();
+ int dataSize = compressedPageSize -
dataHeaderV2.getRepetition_levels_byte_length()
+ - dataHeaderV2.getDefinition_levels_byte_length();
+ if (null != pageBlockDecryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD,
chunk.getPageOrdinal(pageIndex));
+ }
+ DataPage dataPageV2 = new DataPageV2(
+ dataHeaderV2.getNum_rows(),
+ dataHeaderV2.getNum_nulls(),
+ dataHeaderV2.getNum_values(),
+
chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+
chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+ converter.getEncoding(dataHeaderV2.getEncoding()),
+ chunk.readAsBytesInput(dataSize),
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV2.getStatistics(),
+ type),
+ dataHeaderV2.isIs_compressed()
+ );
+ writePage(dataPageV2);
+ valuesCountReadSoFar += dataHeaderV2.getNum_values();
+ ++dataPageCountReadSoFar;
+ pageIndex++;
+ break;
+ default:
+ LOG.debug("skipping page of type {} of size {}",
pageHeader.getType(),
+ compressedPageSize);
+ chunk.stream.skipFully(compressedPageSize);
+ break;
+ }
+ } catch (Exception e) {
+ LOG.info("Exception while reading one more page for: [{} ]: {} ",
Thread.currentThread().getName(), this);
+ throw e;
+ }
+ finally {
+ long timeSpentInRead = System.nanoTime() - startRead;
+ totalCountReadOnePage.add(1);
+ totalTimeReadOnePage.add(timeSpentInRead);
+ maxTimeReadOnePage.accumulate(timeSpentInRead);
+ }
+ }
+
+ private void writePage(DataPage page){
Review Comment:
Let's rename it writerPageToChunk
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException {
}
protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor,
byte[] pageHeaderAAD) throws IOException {
+ String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC";
Review Comment:
Check if debug log enabled
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ dataPageAAD = null;
+ dictionaryPageAAD = null;
+ }
+ }
+
Review Comment:
remove one empty line
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1417,11 +1530,21 @@ List<Chunk> build() {
for (Entry<ChunkDescriptor, ChunkData> entry : entries) {
ChunkDescriptor descriptor = entry.getKey();
ChunkData data = entry.getValue();
+ ByteBufferInputStream byteBufferInputStream;
+ if (isAsyncIOReaderEnabled()) {
+ // For async reads, we use a SequenceByeTeBufferInputStream instead
of a ByteBufferInputStream
+ // because calling sliceBuffers in the latter blocks until all the
buffers are read (effectively
+ // nullifying the async read) while the former blocks only if the
next buffer is unavailable.
+ byteBufferInputStream = new
SequenceByteBufferInputStream(data.streams);
+ } else {
+ byteBufferInputStream = ByteBufferInputStream.wrap(data.buffers);
+ }
if (descriptor.equals(lastDescriptor)) {
// because of a bug, the last chunk might be larger than
descriptor.size
- chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f,
data.offsetIndex, rowCount));
+ chunks.add(new WorkaroundChunk(lastDescriptor,
byteBufferInputStream, f, data.offsetIndex,
Review Comment:
So if isAsyncIOReaderEnabled is not true, we still byteBufferInputStream()?
In line 1497, you comment that only asynchronous set the stream
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
Review Comment:
Like the idea to categorize variables!
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ dataPageAAD = null;
+ dictionaryPageAAD = null;
+ }
+ }
+
+
+ public DictionaryPage getDictionaryPage(){
+ return this.dictionaryPage;
+ }
+
+ public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+ return this.pagesInChunk;
+ }
+
+ void readAllRemainingPagesAsync() {
+ readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+ }
+
+ void readAllRemainingPages() throws IOException {
+ while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+ readOnePage();
+ }
+ if (chunk.offsetIndex == null && valuesCountReadSoFar !=
chunk.descriptor.metadata.getValueCount()) {
+ // Would be nice to have a CorruptParquetFileException or something as
a subclass?
+ throw new IOException(
+ "Expected " + chunk.descriptor.metadata.getValueCount() + " values
in column chunk at " +
+ getPath() + " offset " +
chunk.descriptor.metadata.getFirstDataPageOffset() +
+ " but got " + valuesCountReadSoFar + " values instead over " +
pagesInChunk.size()
+ + " pages ending at file offset " + (chunk.descriptor.fileOffset +
chunk.stream.position()));
+ }
+ try {
+ pagesInChunk.put(Optional.empty()); // add a marker for end of data
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while reading page data", e);
+ }
+ }
+
+ void readOnePage() throws IOException {
+ long startRead = System.nanoTime();
+ try {
+ byte[] pageHeaderAAD = dataPageHeaderAAD;
+ if (null != headerBlockDecryptor) {
+ // Important: this verifies file integrity (makes sure dictionary
page had not been removed)
+ if (null == dictionaryPage &&
chunk.descriptor.metadata.hasDictionaryPage()) {
+ pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPageHeader,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
+ }
+ PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor,
pageHeaderAAD);
+ int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+ int compressedPageSize = pageHeader.getCompressed_page_size();
+ final BytesInput pageBytes;
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ // there is only one dictionary page per column chunk
+ if (dictionaryPage != null) {
+ throw new ParquetDecodingException(
+ "more than one dictionary page in column " +
chunk.descriptor.col);
+ }
+ pageBytes = chunk.readAsBytesInput(compressedPageSize);
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify dictionary page integrity, CRC checksum
verification failed");
+ }
+ DictionaryPageHeader dicHeader =
pageHeader.getDictionary_page_header();
+ DictionaryPage compressedDictionaryPage =
+ new DictionaryPage(
+ pageBytes,
+ uncompressedPageSize,
+ dicHeader.getNum_values(),
+ converter.getEncoding(dicHeader.getEncoding())
+ );
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ compressedDictionaryPage.setCrc(pageHeader.getCrc());
+ }
+ dictionaryPage = compressedDictionaryPage;
Review Comment:
What is the reason a temp variable 'compressedDictionaryPage' is used? I
guess the compiler will remove it anyway though.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
Review Comment:
@ggershinsky Can you review all the decryption-related changes?
> Implement async IO for Parquet file reader
> ------------------------------------------
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Reporter: Parth Chandra
> Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) -
> - For every column -> Read from storage in 8MB blocks -> Read all
> uncompressed pages into output queue
> - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked
> until the data has been read. Because a large part of the time spent is
> waiting for data from storage, threads are idle and CPU utilization is really
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So
> For Column _i_ -> reading one chunk until end, from storage -> intermediate
> output queue -> read one uncompressed page until end -> output queue ->
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and
> downstream implementations like Iceberg and Spark will automatically be able
> to take advantage without code change as long as the ParquetFileReader apis
> are not changed.
> In past work with async io [Drill - async page reader
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
> , I have seen 2x-3x improvement in reading speed for Parquet files.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)