alamb commented on code in PR #12302:
URL: https://github.com/apache/datafusion/pull/12302#discussion_r1745254315


##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -156,12 +164,22 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         }
         // try to initialize the loser tree
         if self.loser_tree.is_empty() {
-            // Ensure all non-exhausted streams have a cursor from which
-            // rows can be pulled
-            for i in 0..self.streams.partitions() {
-                if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) {
-                    self.aborted = true;
-                    return Poll::Ready(Some(Err(e)));
+            // Ensure all non-exhausted streams have a cursor from which rows 
can be pulled

Review Comment:
   👍  -- response in 
https://github.com/synnada-ai/datafusion-upstream/pull/34#issuecomment-2331231064



##########
datafusion/core/tests/fuzz_cases/merge_fuzz.rs:
##########
@@ -160,3 +179,142 @@ fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) 
-> Vec<RecordBatch> {
     v1.extend(v2);
     v1
 }
+
+/// It returns pending for the 1st partition until the 2nd partition is polled.
+#[derive(Debug, Clone)]
+struct CongestedExec {
+    schema: Schema,
+    cache: PlanProperties,
+    congestion_cleared: Arc<Mutex<bool>>,
+}
+
+impl CongestedExec {
+    fn compute_properties(schema: SchemaRef) -> PlanProperties {
+        let columns = schema
+            .fields
+            .iter()
+            .enumerate()
+            .map(|(i, f)| Arc::new(Column::new(f.name(), i)) as Arc<dyn 
PhysicalExpr>)
+            .collect::<Vec<_>>();
+        let mut eq_properties = EquivalenceProperties::new(schema);
+        eq_properties.add_new_orderings(vec![columns
+            .iter()
+            .map(|expr| PhysicalSortExpr::new(expr.clone(), 
SortOptions::default()))
+            .collect::<Vec<_>>()]);
+        let mode = ExecutionMode::Unbounded;
+        PlanProperties::new(eq_properties, Partitioning::Hash(columns, 2), 
mode)
+    }
+}
+
+impl ExecutionPlan for CongestedExec {
+    fn name(&self) -> &'static str {
+        Self::static_name()
+    }
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CongestedStream {
+            schema: Arc::new(self.schema.clone()),
+            congestion_cleared: self.congestion_cleared.clone(),
+            partition,
+        }))
+    }
+}
+
+impl DisplayAs for CongestedExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CongestedExec",).unwrap()
+            }
+        }
+        Ok(())
+    }
+}
+
+/// It returns pending for the 1st partition until the 2nd partition is polled.
+#[derive(Debug)]
+pub struct CongestedStream {
+    schema: SchemaRef,
+    congestion_cleared: Arc<Mutex<bool>>,
+    partition: usize,
+}
+
+impl Stream for CongestedStream {
+    type Item = Result<RecordBatch>;
+    fn poll_next(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        match self.partition {
+            0 => {
+                let cleared = self.congestion_cleared.lock().unwrap();
+                if *cleared {
+                    Poll::Ready(None)
+                } else {
+                    Poll::Pending
+                }
+            }
+            1 => {
+                let mut cleared = self.congestion_cleared.lock().unwrap();
+                *cleared = true;
+                Poll::Ready(None)
+            }
+            _ => unreachable!(),
+        }
+    }
+}
+
+impl RecordBatchStream for CongestedStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+#[tokio::test]
+async fn test_spm_congestion() -> Result<()> {

Review Comment:
   I read this test a bit more -- it doesn't seem like it is actually a fuzz 
test (aka it doesn't seem to have any random inputs, for example). 
   
   I think it would make more sense to put it with the other sort preserving 
merge tests: 
https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L301-L302



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to