alamb commented on code in PR #12302:
URL: https://github.com/apache/datafusion/pull/12302#discussion_r1745254315
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -156,12 +164,22 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
}
// try to initialize the loser tree
if self.loser_tree.is_empty() {
- // Ensure all non-exhausted streams have a cursor from which
- // rows can be pulled
- for i in 0..self.streams.partitions() {
- if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) {
- self.aborted = true;
- return Poll::Ready(Some(Err(e)));
+ // Ensure all non-exhausted streams have a cursor from which rows
can be pulled
Review Comment:
👍 -- response in
https://github.com/synnada-ai/datafusion-upstream/pull/34#issuecomment-2331231064
##########
datafusion/core/tests/fuzz_cases/merge_fuzz.rs:
##########
@@ -160,3 +179,142 @@ fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>)
-> Vec<RecordBatch> {
v1.extend(v2);
v1
}
+
+/// It returns pending for the 1st partition until the 2nd partition is polled.
+#[derive(Debug, Clone)]
+struct CongestedExec {
+ schema: Schema,
+ cache: PlanProperties,
+ congestion_cleared: Arc<Mutex<bool>>,
+}
+
+impl CongestedExec {
+ fn compute_properties(schema: SchemaRef) -> PlanProperties {
+ let columns = schema
+ .fields
+ .iter()
+ .enumerate()
+ .map(|(i, f)| Arc::new(Column::new(f.name(), i)) as Arc<dyn
PhysicalExpr>)
+ .collect::<Vec<_>>();
+ let mut eq_properties = EquivalenceProperties::new(schema);
+ eq_properties.add_new_orderings(vec![columns
+ .iter()
+ .map(|expr| PhysicalSortExpr::new(expr.clone(),
SortOptions::default()))
+ .collect::<Vec<_>>()]);
+ let mode = ExecutionMode::Unbounded;
+ PlanProperties::new(eq_properties, Partitioning::Hash(columns, 2),
mode)
+ }
+}
+
+impl ExecutionPlan for CongestedExec {
+ fn name(&self) -> &'static str {
+ Self::static_name()
+ }
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn properties(&self) -> &PlanProperties {
+ &self.cache
+ }
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ Ok(Box::pin(CongestedStream {
+ schema: Arc::new(self.schema.clone()),
+ congestion_cleared: self.congestion_cleared.clone(),
+ partition,
+ }))
+ }
+}
+
+impl DisplayAs for CongestedExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CongestedExec",).unwrap()
+ }
+ }
+ Ok(())
+ }
+}
+
+/// It returns pending for the 1st partition until the 2nd partition is polled.
+#[derive(Debug)]
+pub struct CongestedStream {
+ schema: SchemaRef,
+ congestion_cleared: Arc<Mutex<bool>>,
+ partition: usize,
+}
+
+impl Stream for CongestedStream {
+ type Item = Result<RecordBatch>;
+ fn poll_next(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ match self.partition {
+ 0 => {
+ let cleared = self.congestion_cleared.lock().unwrap();
+ if *cleared {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+ 1 => {
+ let mut cleared = self.congestion_cleared.lock().unwrap();
+ *cleared = true;
+ Poll::Ready(None)
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl RecordBatchStream for CongestedStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+#[tokio::test]
+async fn test_spm_congestion() -> Result<()> {
Review Comment:
I read this test a bit more -- it doesn't seem like it is actually a fuzz
test (aka it doesn't seem to have any random inputs, for example).
I think it would make more sense to put it with the other sort preserving
merge tests:
https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L301-L302
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]