[
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557369#comment-17557369
]
ASF GitHub Bot commented on PARQUET-2149:
-----------------------------------------
ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903469988
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java:
##########
@@ -61,10 +61,7 @@ public InternalFileDecryptor(FileDecryptionProperties
fileDecryptionProperties)
private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) {
if (null == columnKey) { // Decryptor with footer key
- if (null == aesGcmDecryptorWithFooterKey) {
- aesGcmDecryptorWithFooterKey =
ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
- }
- return aesGcmDecryptorWithFooterKey;
+ return ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
Review Comment:
could you add a unitest of decryption with async io and parallel column
reader, eg to the
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java#L507
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
public static String PARQUET_READ_PARALLELISM =
"parquet.metadata.read.parallelism";
+ public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+ // Thread pool to read column chunk data from disk. Applications should call
setAsyncIOThreadPool
+ // to initialize this with their own implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+ r -> new Thread(r, "parquet-io"));
+
+ // Thread pool to process pages for multiple columns in parallel.
Applications should call
+ // setAsyncProcessThreadPool to initialize this with their own
implementations.
+ // Default initialization is useful only for testing
+ public static ExecutorService processThreadPool =
Executors.newCachedThreadPool(
Review Comment:
should we be creating thread pools if the Async IO and parallel column
reading are not activated?
(here and in the line 135)
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
Review Comment:
we already have a PageReader (interface). Could you rename this class.
##########
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+ private int fetchIndex = 0;
+ private final SeekableInputStream fileInputStream;
+ private int readIndex = 0;
+ private ExecutorService threadPool;
+ private LinkedBlockingQueue<Future<Void>> readFutures;
+ private boolean closed = false;
+
+ private LongAdder totalTimeBlocked = new LongAdder();
+ private LongAdder totalCountBlocked = new LongAdder();
+ private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+ AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream
fileInputStream,
+ List<ByteBuffer> buffers) {
+ super(buffers);
+ this.fileInputStream = fileInputStream;
+ this.threadPool = threadPool;
+ readFutures = new LinkedBlockingQueue<>(buffers.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ASYNC: Begin read into buffers ");
+ for (ByteBuffer buf : buffers) {
+ LOG.debug("ASYNC: buffer {} ", buf);
+ }
+ }
+ fetchAll();
+ }
+
+ private void checkState() {
+ if (closed) {
+ throw new RuntimeException("Stream is closed");
+ }
+ }
+
+ private void fetchAll() {
+ checkState();
+ submitReadTask(0);
+ }
+
+ private void submitReadTask(int bufferNo) {
+ ByteBuffer buffer = buffers.get(bufferNo);
+ try {
+ readFutures.put(threadPool.submit(() -> {
+ readOneBuffer(buffer);
+ if (bufferNo < buffers.size() - 1) {
+ submitReadTask(bufferNo + 1);
+ }
+ return null;
+ })
+ );
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void readOneBuffer(ByteBuffer buffer) {
+ long startTime = System.nanoTime();
+ try {
+ fileInputStream.readFully(buffer);
+ buffer.flip();
+ long readCompleted = System.nanoTime();
+ long timeSpent = readCompleted - startTime;
+ LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+ fetchIndex++;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean nextBuffer() {
+ checkState();
+ // hack: parent constructor can call this method before this class is
fully initialized.
+ // Just return without doing anything.
+ if (readFutures == null) {
+ return false;
+ }
+ if (readIndex < buffers.size()) {
+ long start = System.nanoTime();
+ try {
+ LOG.debug("ASYNC (next): Getting next buffer");
+ Future<Void> future = readFutures.take();
+ future.get();
+ long timeSpent = System.nanoTime() - start;
+ totalCountBlocked.add(1);
+ totalTimeBlocked.add(timeSpent);
+ maxTimeBlocked.accumulate(timeSpent);
+ LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this,
timeSpent);
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ LOG.error("Async (next): exception while getting next buffer: ", e);
+ throw new RuntimeException(e);
+ }
+ readIndex++;
+ }
+ return super.nextBuffer();
+ }
+
+ public void close() {
+ LOG.debug("ASYNC Stream: Blocked: {} {} {}", totalTimeBlocked.longValue()
/ 1000.0,
+ totalCountBlocked.longValue(), maxTimeBlocked.longValue() / 1000.0);
+ Future<Void> readResult;
+ while (!readFutures.isEmpty()) {
+ try {
+ readResult = readFutures.poll();
+ readResult.get();
+ if (!readResult.isDone() && !readResult.isCancelled()) {
+ readResult.cancel(true);
+ } else {
+ readResult.get(1, TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ // Do nothing
+ }
+ }
+ closed = true;
+ }
+
Review Comment:
nit: empty line
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f,
ChunkListBuilder builder) throws IOEx
public long endPos() {
return offset + length;
}
+
+ @Override
+ public String toString() {
+ return "ConsecutivePartList{" +
+ "offset=" + offset +
+ ", length=" + length +
+ ", chunks=" + chunks +
+ '}';
+ }
}
+
+ /**
+ * Encapsulates the reading of a single page.
+ */
+ public class PageReader implements Closeable {
+ private final Chunk chunk;
+ private final int currentBlock;
+ private final BlockCipher.Decryptor headerBlockDecryptor;
+ private final BlockCipher.Decryptor pageBlockDecryptor;
+ private final byte[] aadPrefix;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+
+ //state
+ private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new
LinkedBlockingDeque<>();
+ private DictionaryPage dictionaryPage = null;
+ private int pageIndex = 0;
+ private long valuesCountReadSoFar = 0;
+ private int dataPageCountReadSoFar = 0;
+
+ // derived
+ private final PrimitiveType type;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
+ private byte[] dataPageHeaderAAD = null;
+
+ private final BytesInputDecompressor decompressor;
+
+ private final ConcurrentLinkedQueue<Future<Void>> readFutures = new
ConcurrentLinkedQueue<>();
+
+ private final LongAdder totalTimeReadOnePage = new LongAdder();
+ private final LongAdder totalCountReadOnePage = new LongAdder();
+ private final LongAccumulator maxTimeReadOnePage = new
LongAccumulator(Long::max, 0L);
+ private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+ private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+ private final LongAccumulator maxTimeBlockedPagesInChunk = new
LongAccumulator(Long::max, 0L);
+
+ public PageReader(Chunk chunk, int currentBlock, Decryptor
headerBlockDecryptor,
+ Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int
columnOrdinal,
+ BytesInputDecompressor decompressor
+ ) {
+ this.chunk = chunk;
+ this.currentBlock = currentBlock;
+ this.headerBlockDecryptor = headerBlockDecryptor;
+ this.pageBlockDecryptor = pageBlockDecryptor;
+ this.aadPrefix = aadPrefix;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.decompressor = decompressor;
+
+ this.type = getFileMetaData().getSchema()
+ .getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPageHeader,
+ rowGroupOrdinal,
+ columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+ }
+ if (null != pageBlockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DataPage, rowGroupOrdinal,
+ columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ dataPageAAD = null;
+ dictionaryPageAAD = null;
+ }
+ }
+
+
+ public DictionaryPage getDictionaryPage(){
+ return this.dictionaryPage;
+ }
+
+ public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk(){
+ return this.pagesInChunk;
+ }
+
+ void readAllRemainingPagesAsync() {
+ readFutures.offer(processThreadPool.submit(new PageReaderTask(this)));
+ }
+
+ void readAllRemainingPages() throws IOException {
+ while(hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+ readOnePage();
+ }
+ if (chunk.offsetIndex == null && valuesCountReadSoFar !=
chunk.descriptor.metadata.getValueCount()) {
+ // Would be nice to have a CorruptParquetFileException or something as
a subclass?
+ throw new IOException(
+ "Expected " + chunk.descriptor.metadata.getValueCount() + " values
in column chunk at " +
+ getPath() + " offset " +
chunk.descriptor.metadata.getFirstDataPageOffset() +
+ " but got " + valuesCountReadSoFar + " values instead over " +
pagesInChunk.size()
+ + " pages ending at file offset " + (chunk.descriptor.fileOffset +
chunk.stream.position()));
+ }
+ try {
+ pagesInChunk.put(Optional.empty()); // add a marker for end of data
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while reading page data", e);
+ }
+ }
+
+ void readOnePage() throws IOException {
+ long startRead = System.nanoTime();
+ try {
+ byte[] pageHeaderAAD = dataPageHeaderAAD;
+ if (null != headerBlockDecryptor) {
+ // Important: this verifies file integrity (makes sure dictionary
page had not been removed)
+ if (null == dictionaryPage &&
chunk.descriptor.metadata.hasDictionaryPage()) {
+ pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix,
ModuleType.DictionaryPageHeader,
+ rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
+ }
+ PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor,
pageHeaderAAD);
+ int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+ int compressedPageSize = pageHeader.getCompressed_page_size();
+ final BytesInput pageBytes;
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ // there is only one dictionary page per column chunk
+ if (dictionaryPage != null) {
+ throw new ParquetDecodingException(
+ "more than one dictionary page in column " +
chunk.descriptor.col);
+ }
+ pageBytes = chunk.readAsBytesInput(compressedPageSize);
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify dictionary page integrity, CRC checksum
verification failed");
+ }
+ DictionaryPageHeader dicHeader =
pageHeader.getDictionary_page_header();
+ DictionaryPage compressedDictionaryPage =
+ new DictionaryPage(
+ pageBytes,
+ uncompressedPageSize,
+ dicHeader.getNum_values(),
+ converter.getEncoding(dicHeader.getEncoding())
+ );
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ compressedDictionaryPage.setCrc(pageHeader.getCrc());
+ }
+ dictionaryPage = compressedDictionaryPage;
+ break;
+ case DATA_PAGE:
+ DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+ pageBytes = chunk.readAsBytesInput(compressedPageSize);
+ if (null != pageBlockDecryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD,
chunk.getPageOrdinal(pageIndex));
+ }
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify page integrity, CRC checksum verification
failed");
+ }
+ DataPageV1 dataPageV1 = new DataPageV1(
+ pageBytes,
+ dataHeaderV1.getNum_values(),
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV1.getStatistics(),
+ type),
+
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getEncoding()));
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dataPageV1.setCrc(pageHeader.getCrc());
+ }
+ writePage(dataPageV1);
+ valuesCountReadSoFar += dataHeaderV1.getNum_values();
+ ++dataPageCountReadSoFar;
+ pageIndex++;
+ break;
+ case DATA_PAGE_V2:
+ DataPageHeaderV2 dataHeaderV2 =
pageHeader.getData_page_header_v2();
+ int dataSize = compressedPageSize -
dataHeaderV2.getRepetition_levels_byte_length()
+ - dataHeaderV2.getDefinition_levels_byte_length();
+ if (null != pageBlockDecryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD,
chunk.getPageOrdinal(pageIndex));
+ }
+ DataPage dataPageV2 = new DataPageV2(
+ dataHeaderV2.getNum_rows(),
+ dataHeaderV2.getNum_nulls(),
+ dataHeaderV2.getNum_values(),
+
chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+
chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+ converter.getEncoding(dataHeaderV2.getEncoding()),
+ chunk.readAsBytesInput(dataSize),
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV2.getStatistics(),
+ type),
+ dataHeaderV2.isIs_compressed()
+ );
+ writePage(dataPageV2);
+ valuesCountReadSoFar += dataHeaderV2.getNum_values();
+ ++dataPageCountReadSoFar;
+ pageIndex++;
+ break;
+ default:
+ LOG.debug("skipping page of type {} of size {}",
pageHeader.getType(),
+ compressedPageSize);
+ chunk.stream.skipFully(compressedPageSize);
+ break;
+ }
+ } catch (Exception e) {
+ LOG.info("Exception while reading one more page for: [{} ]: {} ",
Thread.currentThread().getName(), this);
+ throw e;
+ }
+ finally {
+ long timeSpentInRead = System.nanoTime() - startRead;
+ totalCountReadOnePage.add(1);
+ totalTimeReadOnePage.add(timeSpentInRead);
+ maxTimeReadOnePage.accumulate(timeSpentInRead);
+ }
+ }
+
+ private void writePage(DataPage page){
+ long writeStart = System.nanoTime();
+ try {
+ pagesInChunk.put(Optional.of(page));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while reading page data", e);
+ }
+ long timeSpent = System.nanoTime() - writeStart;
+ totalTimeBlockedPagesInChunk.add(timeSpent);
+ totalCountBlockedPagesInChunk.add(1);
+ maxTimeBlockedPagesInChunk.accumulate(timeSpent);
+ }
+
+ private boolean hasMorePages(long valuesCountReadSoFar, int
dataPageCountReadSoFar) {
+ return chunk.offsetIndex == null ? valuesCountReadSoFar <
chunk.descriptor.metadata.getValueCount()
+ : dataPageCountReadSoFar < chunk.offsetIndex.getPageCount();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Future<Void> readResult;
+ while(!readFutures.isEmpty()) {
+ try {
+ readResult = readFutures.poll();
+ readResult.get();
+ if(!readResult.isDone() && !readResult.isCancelled()){
+ readResult.cancel(true);
+ } else {
+ readResult.get(1, TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ // Do nothing
+ }
+ }
+ String mode = isParallelColumnReaderEnabled()?"ASYNC" : "SYNC";
+ LOG.debug("READ PAGE: {}, {}, {}, {}, {}, {}, {}", mode,
Review Comment:
this sounds as if logged in a readPage call.
> Implement async IO for Parquet file reader
> ------------------------------------------
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Reporter: Parth Chandra
> Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) -
> - For every column -> Read from storage in 8MB blocks -> Read all
> uncompressed pages into output queue
> - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked
> until the data has been read. Because a large part of the time spent is
> waiting for data from storage, threads are idle and CPU utilization is really
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So
> For Column _i_ -> reading one chunk until end, from storage -> intermediate
> output queue -> read one uncompressed page until end -> output queue ->
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and
> downstream implementations like Iceberg and Spark will automatically be able
> to take advantage without code change as long as the ParquetFileReader apis
> are not changed.
> In past work with async io [Drill - async page reader
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
> , I have seen 2x-3x improvement in reading speed for Parquet files.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)