This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 87a67d4b19 [Bug fix] Source projection output ordering (#6136)
87a67d4b19 is described below

commit 87a67d4b190d5849969ead52fe75f4d6fccbbdec
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Apr 28 09:43:07 2023 +0300

    [Bug fix] Source projection output ordering (#6136)
    
    * Bug fix at source projection
    
    * Simplifications
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../core/src/physical_plan/file_format/avro.rs     |  9 ++--
 .../core/src/physical_plan/file_format/csv.rs      |  9 ++--
 .../src/physical_plan/file_format/file_stream.rs   |  2 +-
 .../core/src/physical_plan/file_format/json.rs     | 11 +++--
 .../core/src/physical_plan/file_format/mod.rs      | 54 ++++++++++++++++------
 .../core/src/physical_plan/file_format/parquet.rs  |  9 ++--
 datafusion/core/src/test_util/mod.rs               | 10 +++-
 datafusion/core/tests/sql/parquet.rs               |  2 +-
 datafusion/core/tests/sql/projection.rs            | 30 +++++++++++-
 9 files changed, 103 insertions(+), 33 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 7a4883b0a9..87d3ca1ec7 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricsSet};
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
@@ -37,6 +37,7 @@ pub struct AvroExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -44,12 +45,14 @@ pub struct AvroExec {
 impl AvroExec {
     /// Create a new Avro reader execution plan provided base configurations
     pub fn new(base_config: FileScanConfig) -> Self {
-        let (projected_schema, projected_statistics) = base_config.project();
+        let (projected_schema, projected_statistics, 
projected_output_ordering) =
+            base_config.project();
 
         Self {
             base_config,
             projected_schema,
             projected_statistics,
+            projected_output_ordering,
             metrics: ExecutionPlanMetricsSet::new(),
         }
     }
@@ -77,7 +80,7 @@ impl ExecutionPlan for AvroExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        get_output_ordering(&self.base_config)
+        self.projected_output_ordering.as_deref()
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8c8d89e38f..886c609e72 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -46,7 +46,7 @@ use std::sync::Arc;
 use std::task::Poll;
 use tokio::task::{self, JoinHandle};
 
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
 
 /// Execution plan for scanning a CSV file
 #[derive(Debug, Clone)]
@@ -54,6 +54,7 @@ pub struct CsvExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
     has_header: bool,
     delimiter: u8,
     /// Execution metrics
@@ -69,12 +70,14 @@ impl CsvExec {
         delimiter: u8,
         file_compression_type: FileCompressionType,
     ) -> Self {
-        let (projected_schema, projected_statistics) = base_config.project();
+        let (projected_schema, projected_statistics, 
projected_output_ordering) =
+            base_config.project();
 
         Self {
             base_config,
             projected_schema,
             projected_statistics,
+            projected_output_ordering,
             has_header,
             delimiter,
             metrics: ExecutionPlanMetricsSet::new(),
@@ -118,7 +121,7 @@ impl ExecutionPlan for CsvExec {
 
     /// See comments on `impl ExecutionPlan for ParquetExec`: output order 
can't be
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        get_output_ordering(&self.base_config)
+        self.projected_output_ordering.as_deref()
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs 
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 538798ead1..d8d1b40726 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -218,7 +218,7 @@ impl<F: FileOpener> FileStream<F> {
         file_reader: F,
         metrics: &ExecutionPlanMetricsSet,
     ) -> Result<Self> {
-        let (projected_schema, _) = config.project();
+        let (projected_schema, ..) = config.project();
         let pc_projector = PartitionColumnProjector::new(
             projected_schema.clone(),
             &config
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 61ca912ef9..cef69883e9 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -44,7 +44,7 @@ use std::sync::Arc;
 use std::task::Poll;
 use tokio::task::{self, JoinHandle};
 
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
 
 /// Execution plan for scanning NdJson data source
 #[derive(Debug, Clone)]
@@ -52,6 +52,7 @@ pub struct NdJsonExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     file_compression_type: FileCompressionType,
@@ -63,12 +64,14 @@ impl NdJsonExec {
         base_config: FileScanConfig,
         file_compression_type: FileCompressionType,
     ) -> Self {
-        let (projected_schema, projected_statistics) = base_config.project();
+        let (projected_schema, projected_statistics, 
projected_output_ordering) =
+            base_config.project();
 
         Self {
             base_config,
             projected_schema,
             projected_statistics,
+            projected_output_ordering,
             metrics: ExecutionPlanMetricsSet::new(),
             file_compression_type,
         }
@@ -98,7 +101,7 @@ impl ExecutionPlan for NdJsonExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        get_output_ordering(&self.base_config)
+        self.projected_output_ordering.as_deref()
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -118,7 +121,7 @@ impl ExecutionPlan for NdJsonExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let batch_size = context.session_config().batch_size();
-        let (projected_schema, _) = self.base_config.project();
+        let (projected_schema, ..) = self.base_config.project();
 
         let object_store = context
             .runtime_env()
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 5fffc628af..4f85f54113 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -53,6 +53,7 @@ use crate::{
 use arrow::array::new_null_array;
 use arrow::record_batch::RecordBatchOptions;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
+use datafusion_physical_expr::expressions::Column;
 use log::{debug, info, warn};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -158,9 +159,13 @@ pub struct FileScanConfig {
 
 impl FileScanConfig {
     /// Project the schema and the statistics on the given column indices
-    fn project(&self) -> (SchemaRef, Statistics) {
+    fn project(&self) -> (SchemaRef, Statistics, 
Option<Vec<PhysicalSortExpr>>) {
         if self.projection.is_none() && self.table_partition_cols.is_empty() {
-            return (Arc::clone(&self.file_schema), self.statistics.clone());
+            return (
+                Arc::clone(&self.file_schema),
+                self.statistics.clone(),
+                self.output_ordering.clone(),
+            );
         }
 
         let proj_iter: Box<dyn Iterator<Item = usize>> = match 
&self.projection {
@@ -203,8 +208,9 @@ impl FileScanConfig {
         let table_schema = Arc::new(
             
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
         );
-
-        (table_schema, table_stats)
+        let projected_output_ordering =
+            get_projected_output_ordering(self, &table_schema);
+        (table_schema, table_stats, projected_output_ordering)
     }
 
     #[allow(unused)] // Only used by avro
@@ -704,17 +710,35 @@ impl From<ObjectMeta> for FileMeta {
 ///
 ///              ParquetExec
 ///```
-pub(crate) fn get_output_ordering(
+fn get_projected_output_ordering(
     base_config: &FileScanConfig,
-) -> Option<&[PhysicalSortExpr]> {
-    base_config.output_ordering.as_ref()
-        .map(|output_ordering| if base_config.file_groups.iter().any(|group| 
group.len() > 1) {
+    projected_schema: &SchemaRef,
+) -> Option<Vec<PhysicalSortExpr>> {
+    let mut new_ordering = vec![];
+    if let Some(output_ordering) = &base_config.output_ordering {
+        if base_config.file_groups.iter().any(|group| group.len() > 1) {
             debug!("Skipping specified output ordering {:?}. Some file group 
had more than one file: {:?}",
                    output_ordering, base_config.file_groups);
-            None
-        } else {
-            Some(output_ordering.as_slice())
-        }).unwrap_or_else(|| None)
+            return None;
+        }
+        for PhysicalSortExpr { expr, options } in output_ordering {
+            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+                let name = col.name();
+                if let Some((idx, _)) = 
projected_schema.column_with_name(name) {
+                    // Compute the new sort expression (with correct index) 
after projection:
+                    new_ordering.push(PhysicalSortExpr {
+                        expr: Arc::new(Column::new(name, idx)),
+                        options: *options,
+                    });
+                    continue;
+                }
+            }
+            // Cannot find expression in the projected_schema, stop iterating
+            // since rest of the orderings are violated
+            break;
+        }
+    }
+    (!new_ordering.is_empty()).then_some(new_ordering)
 }
 
 #[cfg(test)]
@@ -741,7 +765,7 @@ mod tests {
             )],
         );
 
-        let (proj_schema, proj_statistics) = conf.project();
+        let (proj_schema, proj_statistics, _) = conf.project();
         assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
         assert_eq!(
             proj_schema.field(file_schema.fields().len()).name(),
@@ -790,7 +814,7 @@ mod tests {
             )],
         );
 
-        let (proj_schema, proj_statistics) = conf.project();
+        let (proj_schema, proj_statistics, _) = conf.project();
         assert_eq!(
             columns(&proj_schema),
             vec!["date".to_owned(), "c1".to_owned()]
@@ -845,7 +869,7 @@ mod tests {
             Statistics::default(),
             partition_cols.clone(),
         );
-        let (proj_schema, _) = conf.project();
+        let (proj_schema, ..) = conf.project();
         // created a projector for that projected schema
         let mut proj = PartitionColumnProjector::new(
             proj_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 6d9e35c32e..c0d4fb306d 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -70,8 +70,6 @@ use crate::physical_plan::common::AbortOnDropSingle;
 use 
crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
 pub use metrics::ParquetFileMetrics;
 
-use super::get_output_ordering;
-
 #[derive(Default)]
 struct RepartitionState {
     current_partition_index: usize,
@@ -94,6 +92,7 @@ pub struct ParquetExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for row filtering during parquet scan
@@ -151,7 +150,8 @@ impl ParquetExec {
             }
         });
 
-        let (projected_schema, projected_statistics) = base_config.project();
+        let (projected_schema, projected_statistics, 
projected_output_ordering) =
+            base_config.project();
 
         Self {
             pushdown_filters: None,
@@ -160,6 +160,7 @@ impl ParquetExec {
             base_config,
             projected_schema,
             projected_statistics,
+            projected_output_ordering,
             metrics,
             predicate,
             pruning_predicate,
@@ -343,7 +344,7 @@ impl ExecutionPlan for ParquetExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        get_output_ordering(&self.base_config)
+        self.projected_output_ordering.as_deref()
     }
 
     fn with_new_children(
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index 5ac602a085..9035b6d893 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -339,16 +339,24 @@ fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
 
 // Return a static RecordBatch and its ordering for tests. RecordBatch is 
ordered by a, b, c
 fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
+    let a0 = Field::new("a0", DataType::Int32, false);
     let a = Field::new("a", DataType::Int32, false);
     let b = Field::new("b", DataType::Int32, false);
     let c = Field::new("c", DataType::Int32, false);
     let d = Field::new("d", DataType::Int32, false);
 
-    let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+    let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));
 
     let batch = RecordBatch::try_new(
         schema,
         vec![
+            Arc::new(Int32Array::from_slice([
+                1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1,
+                1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1,
+                1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0,
+                0, 0, 0, 0,
+            ])),
             Arc::new(Int32Array::from_slice([
                 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0,
                 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0,
diff --git a/datafusion/core/tests/sql/parquet.rs 
b/datafusion/core/tests/sql/parquet.rs
index 5aa81e047b..d2998209a2 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -79,7 +79,7 @@ async fn parquet_with_sort_order_specified() {
     // This string appears in ParquetExec if the output ordering is
     // specified
     let expected_output_ordering =
-        "output_ordering=[string_col@9 ASC NULLS LAST, int_col@4 ASC NULLS 
LAST]";
+        "output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS 
LAST]";
 
     // when sort not specified, should not appear in the explain plan
     let num_files = 1;
diff --git a/datafusion/core/tests/sql/projection.rs 
b/datafusion/core/tests/sql/projection.rs
index ac697b1176..5f480d46c6 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use datafusion::datasource::provider_as_source;
-use datafusion::test_util::scan_empty;
+use datafusion::test_util::{get_test_context2, scan_empty};
 use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
 use tempfile::TempDir;
 
@@ -375,3 +375,31 @@ async fn project_columns_in_memory_without_propagation() 
-> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_source_projection() -> Result<()> {
+    let session_config = SessionConfig::new().with_target_partitions(1);
+    // Source is ordered by a, b, c
+    // Source is finite.
+    let tmpdir1 = TempDir::new()?;
+    let ctx = get_test_context2(&tmpdir1, false, session_config).await?;
+    let sql = "SELECT a FROM annotated_data
+        ORDER BY a
+        LIMIT 5";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Final plan shouldn't include SortExec.
+    let expected: Vec<&str> = { vec!["GlobalLimitExec: skip=0, fetch=5"] };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+    Ok(())
+}

Reply via email to