[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541021#comment-17541021 ]
ASF GitHub Bot commented on PARQUET-2149: ----------------------------------------- steveloughran commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879586630 ########## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue<Future<Void>> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, + List<ByteBuffer> buffers) { + super(buffers); + this.fileInputStream = fileInputStream; + this.threadPool = threadPool; + readFutures = new LinkedBlockingQueue<>(buffers.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { + LOG.debug("ASYNC: buffer {} ", buf); + } + } + fetchAll(); + } + + private void checkState() { + if (closed) { + throw new RuntimeException("Stream is closed"); + } + } + + private void fetchAll() { + checkState(); + submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { + ByteBuffer buffer = buffers.get(bufferNo); + try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { + submitReadTask(bufferNo + 1); + } + return null; + }) + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private void readOneBuffer(ByteBuffer buffer) { + long startTime = System.nanoTime(); + try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", Review Comment: how does this work? ########## parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an + * <i>ordered</i> collection of ByteBufferInputStreams. + * <p> + * This class, as implemented, is intended only for a specific use in the ParquetFileReader and + * throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended + * use in other cases. + * <p> + * Even thought this class is derived from ByteBufferInputStream it explicitly does not support any + * byte buffer related methods like slice. It does, however support sliceBuffers which is a + * curious case of reading data from underlying streams + * <p> + * Even though this class changes the state of the underlying streams (by reading from them) + * it does not own them and so the close method does not close the streams. To avoid resource + * leaks the calling code should close the underlying streams + */ +public class SequenceByteBufferInputStream extends ByteBufferInputStream { + + Collection<ByteBufferInputStream> collection; + Iterator<ByteBufferInputStream> iterator; + ByteBufferInputStream current; + long position = 0; + + @Override + public String toString() { + return "SequenceByteBufferInputStream{" + + "collection=" + collection + + ", current=" + current + + ", position=" + position + + '}'; + } + + public SequenceByteBufferInputStream(Collection<ByteBufferInputStream> collection) { + this.collection = collection; + iterator = collection.iterator(); + current = iterator.hasNext() ? iterator.next() : null; + if (current == null) { + throw new UnsupportedOperationException( + "Initializing SequenceByteBufferInputStream with an empty collection is not supported"); + } + } + + @Override + public long position() { + return position; + } + + @Override + public int read(ByteBuffer out) { + int len = out.remaining(); + if (len <= 0) { + return 0; + } + if (current == null) { + return -1; + } + int totalBytesRead = 0; + while (totalBytesRead < len) { + int bytesRead = current.read(out); + if (bytesRead == -1) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + break; + } + } else { + totalBytesRead += bytesRead; + } + } + position += totalBytesRead; + return totalBytesRead; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + throw new UnsupportedOperationException("slice is not supported"); + } + + @Override + /** + * This is a blocking call. Use with care when using in asynchronous mode. + */ + public List<ByteBuffer> sliceBuffers(long len) throws EOFException { + if (len <= 0) { + return Collections.emptyList(); + } + + if (current == null) { + throw new EOFException(); + } + + List<ByteBuffer> buffers = new ArrayList<>(); + long bytesAccumulated = 0; + while (bytesAccumulated < len) { + // This is not strictly according to the input stream contract, but once again the + // underlying implementations of ByteBufferInputStream return the available bytes + // based on the size of the underlying buffers rather than the bytes currently read + // into the buffers. This works for us because the underlying implementations will + // actually fill the buffers with the data before returning the slices we ask for + // (which is why this is a blocking call) + if (current.available() > 0) { + int bufLen = (int) Math.min(len - bytesAccumulated, current.available()); + List<ByteBuffer> currentSlices = current.sliceBuffers(bufLen); + buffers.addAll(currentSlices); + bytesAccumulated += bufLen; + + // update state; the bytes are considered read + this.position += bufLen; + } else { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + // there are no more streams + throw new EOFException(); + } + } + } + position += bytesAccumulated; + return buffers; + } + + @Override + public ByteBufferInputStream sliceStream(long length) throws EOFException { + throw new UnsupportedOperationException("sliceStream is not supported"); + } + + @Override + public List<ByteBuffer> remainingBuffers() { + throw new UnsupportedOperationException("remainingBuffers is not supported"); + } + + @Override + public ByteBufferInputStream remainingStream() { + throw new UnsupportedOperationException("remainingStream is not supported"); + } + + @Override + public int read() throws IOException { + int val; + while (true) { + try { + val = current.read() & 0xFF; // as unsigned + position += 1; + break; + } catch (EOFException e) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + throw new EOFException("End of streams"); Review Comment: InputStream.read mandates return -1 on eof ########## parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an + * <i>ordered</i> collection of ByteBufferInputStreams. + * <p> + * This class, as implemented, is intended only for a specific use in the ParquetFileReader and + * throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended + * use in other cases. + * <p> + * Even thought this class is derived from ByteBufferInputStream it explicitly does not support any + * byte buffer related methods like slice. It does, however support sliceBuffers which is a + * curious case of reading data from underlying streams + * <p> + * Even though this class changes the state of the underlying streams (by reading from them) + * it does not own them and so the close method does not close the streams. To avoid resource + * leaks the calling code should close the underlying streams + */ +public class SequenceByteBufferInputStream extends ByteBufferInputStream { + + Collection<ByteBufferInputStream> collection; + Iterator<ByteBufferInputStream> iterator; + ByteBufferInputStream current; + long position = 0; + + @Override + public String toString() { + return "SequenceByteBufferInputStream{" + + "collection=" + collection + + ", current=" + current + + ", position=" + position + + '}'; + } + + public SequenceByteBufferInputStream(Collection<ByteBufferInputStream> collection) { + this.collection = collection; + iterator = collection.iterator(); + current = iterator.hasNext() ? iterator.next() : null; + if (current == null) { + throw new UnsupportedOperationException( + "Initializing SequenceByteBufferInputStream with an empty collection is not supported"); + } + } + + @Override + public long position() { + return position; + } + + @Override + public int read(ByteBuffer out) { Review Comment: are you 100% confident that all uses of read() in parquet are in a single thread? it is not unknown for apps to read across threads, even though the java APIs say "don't" > Implement async IO for Parquet file reader > ------------------------------------------ > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr > Reporter: Parth Chandra > Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)