thinkharderdev commented on code in PR #6676:
URL: https://github.com/apache/arrow-rs/pull/6676#discussion_r1894947151
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -660,36 +705,89 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
loop {
match &mut self.state {
- StreamState::Decoding(batch_reader) => match
batch_reader.next() {
- Some(Ok(batch)) => {
- return Poll::Ready(Some(Ok(batch)));
+ StreamState::Decoding(batch_reader) => {
+ let res: Self::Item = match batch_reader.next() {
+ Some(Ok(batch)) => Ok(batch),
+ Some(Err(e)) => {
+ self.state = StreamState::Error;
+ return
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+ }
+ None => {
+ self.state = StreamState::Init;
+ continue;
+ }
+ };
+
+ if !self.prefetch_row_groups
+ || self.row_groups.is_empty()
+ || self.next_reader.is_some()
+ {
+ return Poll::Ready(Some(res));
}
- Some(Err(e)) => {
- self.state = StreamState::Error;
- return
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+
+ let old_state = std::mem::replace(&mut self.state,
StreamState::Init);
+
+ let row_group_idx = self.row_groups.pop_front().unwrap();
// already checked that row_groups is not empty
+
+ let fut = self.read_row_group(row_group_idx);
+
+ if let StreamState::Decoding(batch_reader) = old_state {
+ self.state = StreamState::Prefetch(batch_reader, fut);
+ return Poll::Ready(Some(res));
+ } else {
+ unreachable!()
}
- None => self.state = StreamState::Init,
- },
+ }
+ StreamState::Prefetch(batch_reader, f) => {
+ let mut noop_cx =
Context::from_waker(futures::task::noop_waker_ref());
Review Comment:
The concern I have here is that this effectively decouples polling the
underlying IO operations from the reactor. We end up polling the future more or
less continuously (assuming the consumer is just polling the stream in a loop
during decoding which is likely the case).
A different way to handle this would be to have a `Prefetcher` which can
spawn the prefetch op in the background and let the runtime poll it. You can
just "assume tokio" in which case the prefetcher is just `tokio::spawn` or try
and abstract over async runtime behind a trait
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -660,36 +705,89 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
loop {
match &mut self.state {
- StreamState::Decoding(batch_reader) => match
batch_reader.next() {
- Some(Ok(batch)) => {
- return Poll::Ready(Some(Ok(batch)));
+ StreamState::Decoding(batch_reader) => {
+ let res: Self::Item = match batch_reader.next() {
+ Some(Ok(batch)) => Ok(batch),
+ Some(Err(e)) => {
+ self.state = StreamState::Error;
+ return
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+ }
+ None => {
+ self.state = StreamState::Init;
+ continue;
+ }
+ };
+
+ if !self.prefetch_row_groups
+ || self.row_groups.is_empty()
+ || self.next_reader.is_some()
+ {
+ return Poll::Ready(Some(res));
}
- Some(Err(e)) => {
- self.state = StreamState::Error;
- return
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+
+ let old_state = std::mem::replace(&mut self.state,
StreamState::Init);
+
+ let row_group_idx = self.row_groups.pop_front().unwrap();
// already checked that row_groups is not empty
+
+ let fut = self.read_row_group(row_group_idx);
+
+ if let StreamState::Decoding(batch_reader) = old_state {
+ self.state = StreamState::Prefetch(batch_reader, fut);
+ return Poll::Ready(Some(res));
+ } else {
+ unreachable!()
}
- None => self.state = StreamState::Init,
- },
+ }
+ StreamState::Prefetch(batch_reader, f) => {
+ let mut noop_cx =
Context::from_waker(futures::task::noop_waker_ref());
+ match f.poll_unpin(&mut noop_cx) {
Review Comment:
When I have tried to think about this problem at various points I get hung
up at this part. Theoretically polling the next reader here should just be
driving IO (and hence be cheap). But reading the row group includes both IO and
evaluating row filters which can involve non-trivial compute and memory since
we evaluate the row filter over the entire row group in one shot.
To really implement pre-fetch I think we need to have finer-grained control
over the pipelining of IO and cpu operations. E.g. prefetch should only do the
next IO operation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]