sunchao commented on a change in pull request #6949:
URL: https://github.com/apache/arrow/pull/6949#discussion_r413158295
##########
File path: rust/parquet/src/util/io.rs
##########
@@ -31,47 +33,83 @@ pub trait Position {
}
/// Struct that represents a slice of a file data with independent start
position and
-/// length. Internally clones provided file handle, wraps with BufReader and
resets
-/// position before any read.
+/// length. Internally clones provided file handle, wraps with a custom
implementation
+/// of BufReader that resets position before any read.
///
/// This is workaround and alternative for `file.try_clone()` method. It
clones `File`
/// while preserving independent position, which is not available with
`try_clone()`.
///
-/// Designed after `arrow::io::RandomAccessFile`.
+/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
- reader: Mutex<BufReader<R>>,
- start: u64, // start position in a file
- end: u64, // end position in a file
+ reader: RefCell<R>,
+ start: u64, // start position in a file
+ end: u64, // end position in a file
+ buf: Vec<u8>, // the internal buffer `buf` in BufReader
+ buf_pos: usize, // equivalent to the `pos` param in BufReader
Review comment:
This is not super useful as it requires ppl to jump into `BufReader` to
check the documentation - maybe add additional comments on what these two
fields are for?
##########
File path: rust/parquet/src/util/io.rs
##########
@@ -31,47 +33,83 @@ pub trait Position {
}
/// Struct that represents a slice of a file data with independent start
position and
-/// length. Internally clones provided file handle, wraps with BufReader and
resets
-/// position before any read.
+/// length. Internally clones provided file handle, wraps with a custom
implementation
+/// of BufReader that resets position before any read.
///
/// This is workaround and alternative for `file.try_clone()` method. It
clones `File`
/// while preserving independent position, which is not available with
`try_clone()`.
///
-/// Designed after `arrow::io::RandomAccessFile`.
+/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
- reader: Mutex<BufReader<R>>,
- start: u64, // start position in a file
- end: u64, // end position in a file
+ reader: RefCell<R>,
+ start: u64, // start position in a file
+ end: u64, // end position in a file
+ buf: Vec<u8>, // the internal buffer `buf` in BufReader
+ buf_pos: usize, // equivalent to the `pos` param in BufReader
+ buf_cap: usize, // equivalent to the `cap` param in BufReader
}
impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
+ let reader = RefCell::new(fd.try_clone().unwrap());
Self {
- reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())),
+ reader,
start,
end: start + length as u64,
+ buf: vec![0 as u8; DEFAULT_BUF_SIZE],
+ buf_pos: 0,
+ buf_cap: 0,
}
}
+
+ // inspired from BufReader
+ fn fill_buf(&mut self) -> Result<&[u8]> {
+ if self.buf_pos >= self.buf_cap {
+ // If we've reached the end of our internal buffer then we need to
fetch
+ // some more data from the underlying reader.
+ // Branch using `>=` instead of the more correct `==`
+ // to tell the compiler that the pos..cap slice is always valid.
+ debug_assert!(self.buf_pos == self.buf_cap);
+ let mut reader = self.reader.borrow_mut();
+ reader.seek(SeekFrom::Start(self.start))?; // always seek to start
before reading
+ self.buf_cap = reader.read(&mut self.buf)?;
+ self.buf_pos = 0;
+ }
+ Ok(&self.buf[self.buf_pos..self.buf_cap])
+ }
}
impl<R: ParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- let mut reader = self
- .reader
- .lock()
- .map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?;
-
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as
usize);
let buf = &mut buf[0..bytes_to_read];
- reader.seek(SeekFrom::Start(self.start as u64))?;
- let res = reader.read(buf);
- if let Ok(bytes_read) = res {
- self.start += bytes_read as u64;
+ // If we don't have any buffered data and we're doing a massive read
+ // (larger than our internal buffer), bypass our internal buffer
+ // entirely.
+ if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() {
+ // discard buffer
+ self.buf_pos = 0;
+ self.buf_cap = 0;
+ // read directly into param buffer
+ let mut reader = self.reader.borrow_mut();
Review comment:
nit: maybe we can extract this into a util function?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]