This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 87a67d4b19 [Bug fix] Source projection output ordering (#6136)
87a67d4b19 is described below
commit 87a67d4b190d5849969ead52fe75f4d6fccbbdec
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Apr 28 09:43:07 2023 +0300
[Bug fix] Source projection output ordering (#6136)
* Bug fix at source projection
* Simplifications
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_plan/file_format/avro.rs | 9 ++--
.../core/src/physical_plan/file_format/csv.rs | 9 ++--
.../src/physical_plan/file_format/file_stream.rs | 2 +-
.../core/src/physical_plan/file_format/json.rs | 11 +++--
.../core/src/physical_plan/file_format/mod.rs | 54 ++++++++++++++++------
.../core/src/physical_plan/file_format/parquet.rs | 9 ++--
datafusion/core/src/test_util/mod.rs | 10 +++-
datafusion/core/tests/sql/parquet.rs | 2 +-
datafusion/core/tests/sql/projection.rs | 30 +++++++++++-
9 files changed, 103 insertions(+), 33 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 7a4883b0a9..87d3ca1ec7 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::metrics::{ExecutionPlanMetricsSet,
MetricsSet};
use std::any::Any;
use std::sync::Arc;
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
@@ -37,6 +37,7 @@ pub struct AvroExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -44,12 +45,14 @@ pub struct AvroExec {
impl AvroExec {
/// Create a new Avro reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
- let (projected_schema, projected_statistics) = base_config.project();
+ let (projected_schema, projected_statistics,
projected_output_ordering) =
+ base_config.project();
Self {
base_config,
projected_schema,
projected_statistics,
+ projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
}
}
@@ -77,7 +80,7 @@ impl ExecutionPlan for AvroExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- get_output_ordering(&self.base_config)
+ self.projected_output_ordering.as_deref()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8c8d89e38f..886c609e72 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -46,7 +46,7 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::task::{self, JoinHandle};
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
/// Execution plan for scanning a CSV file
#[derive(Debug, Clone)]
@@ -54,6 +54,7 @@ pub struct CsvExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
has_header: bool,
delimiter: u8,
/// Execution metrics
@@ -69,12 +70,14 @@ impl CsvExec {
delimiter: u8,
file_compression_type: FileCompressionType,
) -> Self {
- let (projected_schema, projected_statistics) = base_config.project();
+ let (projected_schema, projected_statistics,
projected_output_ordering) =
+ base_config.project();
Self {
base_config,
projected_schema,
projected_statistics,
+ projected_output_ordering,
has_header,
delimiter,
metrics: ExecutionPlanMetricsSet::new(),
@@ -118,7 +121,7 @@ impl ExecutionPlan for CsvExec {
/// See comments on `impl ExecutionPlan for ParquetExec`: output order
can't be
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- get_output_ordering(&self.base_config)
+ self.projected_output_ordering.as_deref()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 538798ead1..d8d1b40726 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -218,7 +218,7 @@ impl<F: FileOpener> FileStream<F> {
file_reader: F,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
- let (projected_schema, _) = config.project();
+ let (projected_schema, ..) = config.project();
let pc_projector = PartitionColumnProjector::new(
projected_schema.clone(),
&config
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 61ca912ef9..cef69883e9 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -44,7 +44,7 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::task::{self, JoinHandle};
-use super::{get_output_ordering, FileScanConfig};
+use super::FileScanConfig;
/// Execution plan for scanning NdJson data source
#[derive(Debug, Clone)]
@@ -52,6 +52,7 @@ pub struct NdJsonExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
file_compression_type: FileCompressionType,
@@ -63,12 +64,14 @@ impl NdJsonExec {
base_config: FileScanConfig,
file_compression_type: FileCompressionType,
) -> Self {
- let (projected_schema, projected_statistics) = base_config.project();
+ let (projected_schema, projected_statistics,
projected_output_ordering) =
+ base_config.project();
Self {
base_config,
projected_schema,
projected_statistics,
+ projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
}
@@ -98,7 +101,7 @@ impl ExecutionPlan for NdJsonExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- get_output_ordering(&self.base_config)
+ self.projected_output_ordering.as_deref()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -118,7 +121,7 @@ impl ExecutionPlan for NdJsonExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size();
- let (projected_schema, _) = self.base_config.project();
+ let (projected_schema, ..) = self.base_config.project();
let object_store = context
.runtime_env()
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 5fffc628af..4f85f54113 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -53,6 +53,7 @@ use crate::{
use arrow::array::new_null_array;
use arrow::record_batch::RecordBatchOptions;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
+use datafusion_physical_expr::expressions::Column;
use log::{debug, info, warn};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -158,9 +159,13 @@ pub struct FileScanConfig {
impl FileScanConfig {
/// Project the schema and the statistics on the given column indices
- fn project(&self) -> (SchemaRef, Statistics) {
+ fn project(&self) -> (SchemaRef, Statistics,
Option<Vec<PhysicalSortExpr>>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
- return (Arc::clone(&self.file_schema), self.statistics.clone());
+ return (
+ Arc::clone(&self.file_schema),
+ self.statistics.clone(),
+ self.output_ordering.clone(),
+ );
}
let proj_iter: Box<dyn Iterator<Item = usize>> = match
&self.projection {
@@ -203,8 +208,9 @@ impl FileScanConfig {
let table_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);
-
- (table_schema, table_stats)
+ let projected_output_ordering =
+ get_projected_output_ordering(self, &table_schema);
+ (table_schema, table_stats, projected_output_ordering)
}
#[allow(unused)] // Only used by avro
@@ -704,17 +710,35 @@ impl From<ObjectMeta> for FileMeta {
///
/// ParquetExec
///```
-pub(crate) fn get_output_ordering(
+fn get_projected_output_ordering(
base_config: &FileScanConfig,
-) -> Option<&[PhysicalSortExpr]> {
- base_config.output_ordering.as_ref()
- .map(|output_ordering| if base_config.file_groups.iter().any(|group|
group.len() > 1) {
+ projected_schema: &SchemaRef,
+) -> Option<Vec<PhysicalSortExpr>> {
+ let mut new_ordering = vec![];
+ if let Some(output_ordering) = &base_config.output_ordering {
+ if base_config.file_groups.iter().any(|group| group.len() > 1) {
debug!("Skipping specified output ordering {:?}. Some file group
had more than one file: {:?}",
output_ordering, base_config.file_groups);
- None
- } else {
- Some(output_ordering.as_slice())
- }).unwrap_or_else(|| None)
+ return None;
+ }
+ for PhysicalSortExpr { expr, options } in output_ordering {
+ if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+ let name = col.name();
+ if let Some((idx, _)) =
projected_schema.column_with_name(name) {
+ // Compute the new sort expression (with correct index)
after projection:
+ new_ordering.push(PhysicalSortExpr {
+ expr: Arc::new(Column::new(name, idx)),
+ options: *options,
+ });
+ continue;
+ }
+ }
+ // Cannot find expression in the projected_schema, stop iterating
+ // since rest of the orderings are violated
+ break;
+ }
+ }
+ (!new_ordering.is_empty()).then_some(new_ordering)
}
#[cfg(test)]
@@ -741,7 +765,7 @@ mod tests {
)],
);
- let (proj_schema, proj_statistics) = conf.project();
+ let (proj_schema, proj_statistics, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
proj_schema.field(file_schema.fields().len()).name(),
@@ -790,7 +814,7 @@ mod tests {
)],
);
- let (proj_schema, proj_statistics) = conf.project();
+ let (proj_schema, proj_statistics, _) = conf.project();
assert_eq!(
columns(&proj_schema),
vec!["date".to_owned(), "c1".to_owned()]
@@ -845,7 +869,7 @@ mod tests {
Statistics::default(),
partition_cols.clone(),
);
- let (proj_schema, _) = conf.project();
+ let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
proj_schema,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 6d9e35c32e..c0d4fb306d 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -70,8 +70,6 @@ use crate::physical_plan::common::AbortOnDropSingle;
use
crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
pub use metrics::ParquetFileMetrics;
-use super::get_output_ordering;
-
#[derive(Default)]
struct RepartitionState {
current_partition_index: usize,
@@ -94,6 +92,7 @@ pub struct ParquetExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
@@ -151,7 +150,8 @@ impl ParquetExec {
}
});
- let (projected_schema, projected_statistics) = base_config.project();
+ let (projected_schema, projected_statistics,
projected_output_ordering) =
+ base_config.project();
Self {
pushdown_filters: None,
@@ -160,6 +160,7 @@ impl ParquetExec {
base_config,
projected_schema,
projected_statistics,
+ projected_output_ordering,
metrics,
predicate,
pruning_predicate,
@@ -343,7 +344,7 @@ impl ExecutionPlan for ParquetExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- get_output_ordering(&self.base_config)
+ self.projected_output_ordering.as_deref()
}
fn with_new_children(
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index 5ac602a085..9035b6d893 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -339,16 +339,24 @@ fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
// Return a static RecordBatch and its ordering for tests. RecordBatch is
ordered by a, b, c
fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
+ let a0 = Field::new("a0", DataType::Int32, false);
let a = Field::new("a", DataType::Int32, false);
let b = Field::new("b", DataType::Int32, false);
let c = Field::new("c", DataType::Int32, false);
let d = Field::new("d", DataType::Int32, false);
- let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+ let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));
let batch = RecordBatch::try_new(
schema,
vec![
+ Arc::new(Int32Array::from_slice([
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1,
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
+ 0, 0, 0, 0,
+ ])),
Arc::new(Int32Array::from_slice([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
diff --git a/datafusion/core/tests/sql/parquet.rs
b/datafusion/core/tests/sql/parquet.rs
index 5aa81e047b..d2998209a2 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -79,7 +79,7 @@ async fn parquet_with_sort_order_specified() {
// This string appears in ParquetExec if the output ordering is
// specified
let expected_output_ordering =
- "output_ordering=[string_col@9 ASC NULLS LAST, int_col@4 ASC NULLS
LAST]";
+ "output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS
LAST]";
// when sort not specified, should not appear in the explain plan
let num_files = 1;
diff --git a/datafusion/core/tests/sql/projection.rs
b/datafusion/core/tests/sql/projection.rs
index ac697b1176..5f480d46c6 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -16,7 +16,7 @@
// under the License.
use datafusion::datasource::provider_as_source;
-use datafusion::test_util::scan_empty;
+use datafusion::test_util::{get_test_context2, scan_empty};
use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
use tempfile::TempDir;
@@ -375,3 +375,31 @@ async fn project_columns_in_memory_without_propagation()
-> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_source_projection() -> Result<()> {
+ let session_config = SessionConfig::new().with_target_partitions(1);
+ // Source is ordered by a, b, c
+ // Source is finite.
+ let tmpdir1 = TempDir::new()?;
+ let ctx = get_test_context2(&tmpdir1, false, session_config).await?;
+ let sql = "SELECT a FROM annotated_data
+ ORDER BY a
+ LIMIT 5";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Final plan shouldn't include SortExec.
+ let expected: Vec<&str> = { vec!["GlobalLimitExec: skip=0, fetch=5"] };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+ Ok(())
+}