liamzwbao commented on code in PR #17051:
URL: https://github.com/apache/datafusion/pull/17051#discussion_r2373764889


##########
datafusion/core/tests/physical_optimizer/partition_statistics.rs:
##########
@@ -387,6 +388,64 @@ mod test {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_statistics_by_partition_of_interleave() -> Result<()> {
+        let scan1 = create_scan_exec_with_statistics(None, Some(1)).await;
+        let scan2 = create_scan_exec_with_statistics(None, Some(1)).await;
+
+        // Create same hash partitioning on the 'id' column as InterleaveExec
+        // requires all children have a consistent hash partitioning
+        let hash_expr1 = vec![col("id", &scan1.schema())?];
+        let repartition1 = Arc::new(RepartitionExec::try_new(
+            scan1,
+            Partitioning::Hash(hash_expr1, 2),
+        )?);
+        let hash_expr2 = vec![col("id", &scan2.schema())?];
+        let repartition2 = Arc::new(RepartitionExec::try_new(
+            scan2,
+            Partitioning::Hash(hash_expr2, 2),
+        )?);
+
+        let interleave: Arc<dyn ExecutionPlan> =
+            Arc::new(InterleaveExec::try_new(vec![repartition1, 
repartition2])?);
+
+        // Verify the result of partition statistics
+        let stats = (0..interleave.output_partitioning().partition_count())
+            .map(|idx| interleave.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+        assert_eq!(stats.len(), 2);
+
+        let expected_stats = Statistics {
+            num_rows: Precision::Inexact(4),
+            total_byte_size: Precision::Inexact(220),
+            column_statistics: vec![
+                ColumnStatistics::new_unknown(),
+                ColumnStatistics::new_unknown(),
+            ],
+        };
+        assert_eq!(stats[0], expected_stats);
+        assert_eq!(stats[1], expected_stats);
+
+        // Verify the execution results
+        let partitions = execute_stream_partitioned(
+            interleave.clone(),
+            Arc::new(TaskContext::default()),
+        )?;
+        assert_eq!(partitions.len(), 2);
+
+        let mut partition_row_counts = Vec::new();
+        for partition_stream in partitions.into_iter() {
+            let results: Vec<RecordBatch> = 
partition_stream.try_collect().await?;
+            let total_rows: usize = results.iter().map(|batch| 
batch.num_rows()).sum();
+            partition_row_counts.push(total_rows);
+        }
+        assert_eq!(partition_row_counts.len(), 2);
+        assert_eq!(partition_row_counts[0], 2);
+        assert_eq!(partition_row_counts[1], 6);

Review Comment:
   The reason is that hash repartitioning doesn’t distribute the results 
evenly. I added a test for repartition to confirm this inconsistency. Note that 
the stats in repartition are marked as `Inexact`, and `Interleave` only 
converges results from the child partitioning.
   
   Sorry for the late reply, I was out for the past 2 weeks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to