wiedld commented on code in PR #7379:
URL: https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1330506718
##########
datafusion/physical-plan/src/sorts/stream.rs:
##########
@@ -208,3 +237,278 @@ impl<T: FieldArray> PartitionedStream for
FieldCursorStream<T> {
}))
}
}
+
+/// A wrapper around [`CursorStream`] that provides polling of a subset of the
partitioned streams.
+///
+/// This is used in the leaf nodes of the cascading merge tree.
+/// To have the same [`CursorStream`] (with the same RowConverter)
+/// be separately polled by multiple leaf nodes.
+pub struct OffsetCursorStream<C: Cursor> {
+ // Input streams. [`BatchTrackingStream`] is a [`CursorStream`].
+ streams: Arc<Mutex<BatchTrackingStream<C>>>,
+ offset: usize,
+ limit: usize,
+}
+
+impl<C: Cursor> OffsetCursorStream<C> {
+ pub fn new(
+ streams: Arc<Mutex<BatchTrackingStream<C>>>,
+ offset: usize,
+ limit: usize,
+ ) -> Self {
+ Self {
+ streams,
+ offset,
+ limit,
+ }
+ }
+}
+
+impl<C: Cursor> PartitionedStream for OffsetCursorStream<C> {
+ type Output = Result<BatchCursor<C>>;
+
+ fn partitions(&self) -> usize {
+ self.limit - self.offset
+ }
+
+ fn poll_next(
+ &mut self,
+ cx: &mut Context<'_>,
+ stream_idx: usize,
+ ) -> Poll<Option<Self::Output>> {
+ let stream_abs_idx = stream_idx + self.offset;
+ if stream_abs_idx >= self.limit {
+ return Poll::Ready(Some(Err(DataFusionError::Internal(format!(
+ "Invalid stream index {} for offset {} and limit {}",
+ stream_idx, self.offset, self.limit
+ )))));
+ }
+ Poll::Ready(ready!(self.streams.lock().poll_next(cx, stream_abs_idx)))
Review Comment:
This mutex on the stream poll is currently required due to the row converter
requirements. After the next arrow-rs bump, will see if can remove (and check
by adding Sync bounds to upstream elements consumed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]