jhorstmann commented on code in PR #8080: URL: https://github.com/apache/arrow-rs/pull/8080#discussion_r2282477718
########## parquet/src/util/push_buffers.rs: ########## @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::reader::{ChunkReader, Length}; +use bytes::Bytes; +use std::fmt::Display; +use std::ops::Range; + +/// Holds multiple buffers of data that have been requested by the ParquetDecoder +/// +/// This is the in-memory buffer for the ParquetDecoder +/// +/// Features it has: +/// 1. Zero copy as much as possible +/// 2. Keeps non contiguous ranges of bytes +#[derive(Debug, Clone)] +pub(crate) struct PushBuffers { + /// the virtual "offset" of this buffers (added to any request) + offset: u64, + /// The total length of the file being decoded + file_len: u64, + /// The ranges of data that are available for decoding (not adjusted for offset) + ranges: Vec<Range<u64>>, + /// The buffers of data that can be used to decode the Parquet file + buffers: Vec<Bytes>, +} + +impl Display for PushBuffers { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "Buffers (offset: {}, file_len: {})", + self.offset, self.file_len + )?; + writeln!(f, "Available Ranges (w/ offset):")?; + for range in &self.ranges { + writeln!( + f, + " {}..{} ({}..{}): {} bytes", + range.start, + range.end, + range.start + self.offset, + range.end + self.offset, + range.end - range.start + )?; + } + + Ok(()) + } +} + +impl PushBuffers { + /// Create a new Buffers instance with the given file length + pub fn new(file_len: u64) -> Self { + Self { + offset: 0, + file_len, + ranges: Vec::new(), + buffers: Vec::new(), + } + } + + /// Push all the ranges and buffers + pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) { + assert_eq!( + ranges.len(), + buffers.len(), + "Number of ranges must match number of buffers" + ); + for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) { + self.push_range(range, buffer); + } + } + + /// Push a new range and its associated buffer + pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) { + assert_eq!( + (range.end - range.start) as usize, + buffer.len(), + "Range length must match buffer length" + ); + self.ranges.push(range); + self.buffers.push(buffer); + } + + /// Returns true if the Buffers contains data for the given range + pub fn has_range(&self, range: &Range<u64>) -> bool { + self.ranges + .iter() + .any(|r| r.start <= range.start && r.end >= range.end) + } + + fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> { + self.ranges.iter().zip(self.buffers.iter()) + } + + /// return the file length of the Parquet file being read + pub fn file_len(&self) -> u64 { + self.file_len + } + + /// Specify a new offset + pub fn with_offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +impl Length for PushBuffers { + fn len(&self) -> u64 { + self.file_len + } +} + +/// less efficinet implementation of Read for Buffers +impl std::io::Read for PushBuffers { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + // Find the range that contains the start offset + let mut found = false; + for (range, data) in self.iter() { + if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 { + // Found the range, figure out the starting offset in the buffer + let start_offset = (self.offset - range.start) as usize; + let end_offset = start_offset + buf.len(); + let slice = data.slice(start_offset..end_offset); + buf.copy_from_slice(slice.as_ref()); + found = true; Review Comment: Should this loop `break` here after a range is found? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org