alamb commented on code in PR #12302: URL: https://github.com/apache/datafusion/pull/12302#discussion_r1745254315
########## datafusion/physical-plan/src/sorts/merge.rs: ########## @@ -156,12 +164,22 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { } // try to initialize the loser tree if self.loser_tree.is_empty() { - // Ensure all non-exhausted streams have a cursor from which - // rows can be pulled - for i in 0..self.streams.partitions() { - if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) { - self.aborted = true; - return Poll::Ready(Some(Err(e))); + // Ensure all non-exhausted streams have a cursor from which rows can be pulled Review Comment: 👍 -- response in https://github.com/synnada-ai/datafusion-upstream/pull/34#issuecomment-2331231064 ########## datafusion/core/tests/fuzz_cases/merge_fuzz.rs: ########## @@ -160,3 +179,142 @@ fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> { v1.extend(v2); v1 } + +/// It returns pending for the 1st partition until the 2nd partition is polled. +#[derive(Debug, Clone)] +struct CongestedExec { + schema: Schema, + cache: PlanProperties, + congestion_cleared: Arc<Mutex<bool>>, +} + +impl CongestedExec { + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let columns = schema + .fields + .iter() + .enumerate() + .map(|(i, f)| Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>) + .collect::<Vec<_>>(); + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_new_orderings(vec![columns + .iter() + .map(|expr| PhysicalSortExpr::new(expr.clone(), SortOptions::default())) + .collect::<Vec<_>>()]); + let mode = ExecutionMode::Unbounded; + PlanProperties::new(eq_properties, Partitioning::Hash(columns, 2), mode) + } +} + +impl ExecutionPlan for CongestedExec { + fn name(&self) -> &'static str { + Self::static_name() + } + fn as_any(&self) -> &dyn Any { + self + } + fn properties(&self) -> &PlanProperties { + &self.cache + } + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(self) + } + fn execute( + &self, + partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + Ok(Box::pin(CongestedStream { + schema: Arc::new(self.schema.clone()), + congestion_cleared: self.congestion_cleared.clone(), + partition, + })) + } +} + +impl DisplayAs for CongestedExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CongestedExec",).unwrap() + } + } + Ok(()) + } +} + +/// It returns pending for the 1st partition until the 2nd partition is polled. +#[derive(Debug)] +pub struct CongestedStream { + schema: SchemaRef, + congestion_cleared: Arc<Mutex<bool>>, + partition: usize, +} + +impl Stream for CongestedStream { + type Item = Result<RecordBatch>; + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + match self.partition { + 0 => { + let cleared = self.congestion_cleared.lock().unwrap(); + if *cleared { + Poll::Ready(None) + } else { + Poll::Pending + } + } + 1 => { + let mut cleared = self.congestion_cleared.lock().unwrap(); + *cleared = true; + Poll::Ready(None) + } + _ => unreachable!(), + } + } +} + +impl RecordBatchStream for CongestedStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[tokio::test] +async fn test_spm_congestion() -> Result<()> { Review Comment: I read this test a bit more -- it doesn't seem like it is actually a fuzz test (aka it doesn't seem to have any random inputs, for example). I think it would make more sense to put it with the other sort preserving merge tests: https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L301-L302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org