wiedld commented on code in PR #7379:
URL: https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1348252777
##########
datafusion/physical-plan/src/sorts/stream.rs:
##########
@@ -208,3 +237,278 @@ impl<T: FieldArray> PartitionedStream for
FieldCursorStream<T> {
}))
}
}
+
+/// A wrapper around [`CursorStream`] that provides polling of a subset of the
partitioned streams.
+///
+/// This is used in the leaf nodes of the cascading merge tree.
+/// To have the same [`CursorStream`] (with the same RowConverter)
+/// be separately polled by multiple leaf nodes.
+pub struct OffsetCursorStream<C: Cursor> {
+ // Input streams. [`BatchTrackingStream`] is a [`CursorStream`].
+ streams: Arc<Mutex<BatchTrackingStream<C>>>,
+ offset: usize,
+ limit: usize,
+}
+
+impl<C: Cursor> OffsetCursorStream<C> {
+ pub fn new(
+ streams: Arc<Mutex<BatchTrackingStream<C>>>,
+ offset: usize,
+ limit: usize,
+ ) -> Self {
+ Self {
+ streams,
+ offset,
+ limit,
+ }
+ }
+}
+
+impl<C: Cursor> PartitionedStream for OffsetCursorStream<C> {
+ type Output = Result<BatchCursor<C>>;
+
+ fn partitions(&self) -> usize {
+ self.limit - self.offset
+ }
+
+ fn poll_next(
+ &mut self,
+ cx: &mut Context<'_>,
+ stream_idx: usize,
+ ) -> Poll<Option<Self::Output>> {
+ let stream_abs_idx = stream_idx + self.offset;
+ if stream_abs_idx >= self.limit {
+ return Poll::Ready(Some(Err(DataFusionError::Internal(format!(
+ "Invalid stream index {} for offset {} and limit {}",
+ stream_idx, self.offset, self.limit
+ )))));
+ }
+ Poll::Ready(ready!(self.streams.lock().poll_next(cx, stream_abs_idx)))
Review Comment:
Mutex removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]