This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 986be19dcd Implement tree explain for `DataSourceExec` (#15029)
986be19dcd is described below
commit 986be19dcdae3cbd6acc0fb91202c445a71cf037
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 6 10:39:56 2025 -0500
Implement tree explain for `DataSourceExec` (#15029)
* Implement tree explain for DataSourceExec
* improve test
* Apply suggestions from code review
Co-authored-by: Oleks V <[email protected]>
* fmt
---------
Co-authored-by: Oleks V <[email protected]>
---
datafusion/datasource-csv/src/source.rs | 9 +-
datafusion/datasource-parquet/src/source.rs | 11 +-
datafusion/datasource/src/file_scan_config.rs | 5 +-
datafusion/datasource/src/memory.rs | 30 +--
datafusion/datasource/src/source.rs | 3 +-
.../sqllogictest/test_files/explain_tree.slt | 223 +++++++++++++++++++--
6 files changed, 237 insertions(+), 44 deletions(-)
diff --git a/datafusion/datasource-csv/src/source.rs
b/datafusion/datasource-csv/src/source.rs
index 124b32d4f1..bb584433d1 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -617,8 +617,13 @@ impl FileSource for CsvSource {
fn file_type(&self) -> &str {
"csv"
}
- fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
- write!(f, ", has_header={}", self.has_header)
+ fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, ", has_header={}", self.has_header)
+ }
+ DisplayFormatType::TreeRender => Ok(()),
+ }
}
}
diff --git a/datafusion/datasource-parquet/src/source.rs
b/datafusion/datasource-parquet/src/source.rs
index 51b38f04fa..683d62a1df 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -554,14 +554,11 @@ impl FileSource for ParquetSource {
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
match t {
- DisplayFormatType::Default
- | DisplayFormatType::Verbose
- | DisplayFormatType::TreeRender => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();
-
let pruning_predicate_string = self
.pruning_predicate()
.map(|pre| {
@@ -581,6 +578,12 @@ impl FileSource for ParquetSource {
write!(f, "{}{}", predicate_string, pruning_predicate_string)
}
+ DisplayFormatType::TreeRender => {
+ if let Some(predicate) = self.predicate() {
+ writeln!(f, "predicate={predicate}")?;
+ }
+ Ok(())
+ }
}
}
}
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 66ef626268..91b5f01577 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -218,7 +218,10 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}
DisplayFormatType::TreeRender => {
- // TODO: collect info
+ writeln!(f, "format={}", self.file_source.file_type())?;
+ self.file_source.fmt_extra(t, f)?;
+ let num_files =
self.file_groups.iter().map(Vec::len).sum::<usize>();
+ writeln!(f, "files={num_files}")?;
Ok(())
}
}
diff --git a/datafusion/datasource/src/memory.rs
b/datafusion/datasource/src/memory.rs
index 1ea0f2ea2e..64fd56971b 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -425,25 +425,17 @@ impl DataSource for MemorySourceConfig {
}
}
DisplayFormatType::TreeRender => {
- let partition_sizes: Vec<_> =
- self.partitions.iter().map(|b| b.len()).collect();
- writeln!(f, "partition_sizes={:?}", partition_sizes)?;
-
- if let Some(output_ordering) = self.sort_information.first() {
- writeln!(f, "output_ordering={}", output_ordering)?;
- }
-
- let eq_properties = self.eq_properties();
- let constraints = eq_properties.constraints();
- if !constraints.is_empty() {
- writeln!(f, "constraints={}", constraints)?;
- }
-
- if let Some(limit) = self.fetch {
- writeln!(f, "fetch={}", limit)?;
- }
-
- write!(f, "partitions={}", partition_sizes.len())
+ let total_rows = self.partitions.iter().map(|b|
b.len()).sum::<usize>();
+ let total_bytes: usize = self
+ .partitions
+ .iter()
+ .flatten()
+ .map(|batch| batch.get_array_memory_size())
+ .sum();
+ writeln!(f, "format=memory")?;
+ writeln!(f, "rows={total_rows}")?;
+ writeln!(f, "bytes={total_bytes}")?;
+ Ok(())
}
}
}
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index bb9790e875..6e78df760d 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -52,6 +52,7 @@ pub trait DataSource: Send + Sync + Debug {
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
+ /// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
/// Return a copy of this DataSource with a new partitioning scheme
@@ -103,7 +104,7 @@ impl DisplayAs for DataSourceExec {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DataSourceExec: ")?;
}
- DisplayFormatType::TreeRender => write!(f, "")?,
+ DisplayFormatType::TreeRender => {}
}
self.data_source.fmt_as(t, f)
}
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt
b/datafusion/sqllogictest/test_files/explain_tree.slt
index 18de2c6a61..9d50b9bd62 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -54,10 +54,36 @@ STORED AS PARQUET
LOCATION 'test_files/scratch/explain_tree/table2.parquet';
-# table3: Memoru
+# table3: Memory
statement ok
CREATE TABLE table3 as select * from table1;
+# table4: JSON
+query I
+COPY (SELECT * from table1)
+TO 'test_files/scratch/explain_tree/table4.json'
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE table4
+STORED AS JSON
+LOCATION 'test_files/scratch/explain_tree/table4.json';
+
+# table5: ARROW
+query I
+COPY (SELECT * from table1)
+TO 'test_files/scratch/explain_tree/table5.arrow'
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE table5
+STORED AS ARROW
+LOCATION 'test_files/scratch/explain_tree/table5.arrow';
+
+
+
######## Begin Queries ########
# Filter
@@ -83,7 +109,10 @@ physical_plan
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
-15)└───────────────────────────┘
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: csv │
+18)└───────────────────────────┘
# Aggregate
query TT
@@ -110,7 +139,10 @@ physical_plan
15)└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐
17)│ DataSourceExec │
-18)└───────────────────────────┘
+18)│ -------------------- │
+19)│ files: 1 │
+20)│ format: csv │
+21)└───────────────────────────┘
# 2 Joins
query TT
@@ -139,7 +171,10 @@ physical_plan
15)└─────────────┬─────────────┘└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
17)│ DataSourceExec ││ DataSourceExec │
-18)└───────────────────────────┘└───────────────────────────┘
+18)│ -------------------- ││ -------------------- │
+19)│ files: 1 ││ files: 1 │
+20)│ format: csv ││ format: parquet │
+21)└───────────────────────────┘└───────────────────────────┘
# 3 Joins
query TT
@@ -175,18 +210,22 @@ physical_plan
13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
14)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││
DataSourceExec │
15)│ ││ ││
-------------------- │
-16)│ ││ ││
partition_sizes: [1] │
-17)│ ││ ││
partitions: 1 │
-18)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
-19)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-20)│ RepartitionExec ││ RepartitionExec │
-21)└─────────────┬─────────────┘└─────────────┬─────────────┘
-22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-23)│ RepartitionExec ││ RepartitionExec │
-24)└─────────────┬─────────────┘└─────────────┬─────────────┘
-25)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-26)│ DataSourceExec ││ DataSourceExec │
-27)└───────────────────────────┘└───────────────────────────┘
+16)│ ││ ││ bytes:
1560 │
+17)│ ││ ││ format:
memory │
+18)│ ││ ││ rows:
1 │
+19)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
+20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+21)│ RepartitionExec ││ RepartitionExec │
+22)└─────────────┬─────────────┘└─────────────┬─────────────┘
+23)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+24)│ RepartitionExec ││ RepartitionExec │
+25)└─────────────┬─────────────┘└─────────────┬─────────────┘
+26)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+27)│ DataSourceExec ││ DataSourceExec │
+28)│ -------------------- ││ -------------------- │
+29)│ files: 1 ││ files: 1 │
+30)│ format: csv ││ format: parquet │
+31)└───────────────────────────┘└───────────────────────────┘
# Long Filter (demonstrate what happens with wrapping)
query TT
@@ -213,9 +252,153 @@ physical_plan
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
-15)└───────────────────────────┘
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: csv │
+18)└───────────────────────────┘
+
+# Query with filter on csv
+query TT
+explain SELECT int_col FROM table1 WHERE string_col != 'foo';
+----
+logical_plan
+01)Projection: table1.int_col
+02)--Filter: table1.string_col != Utf8("foo")
+03)----TableScan: table1 projection=[int_col, string_col],
partial_filters=[table1.string_col != Utf8("foo")]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalesceBatchesExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ FilterExec │
+06)│ -------------------- │
+07)│ predicate: │
+08)│ string_col@1 != foo │
+09)└─────────────┬─────────────┘
+10)┌─────────────┴─────────────┐
+11)│ RepartitionExec │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ DataSourceExec │
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: csv │
+18)└───────────────────────────┘
+# Query with filter on parquet
+query TT
+explain SELECT int_col FROM table2 WHERE string_col != 'foo';
+----
+logical_plan
+01)Projection: table2.int_col
+02)--Filter: table2.string_col != Utf8View("foo")
+03)----TableScan: table2 projection=[int_col, string_col],
partial_filters=[table2.string_col != Utf8View("foo")]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalesceBatchesExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ FilterExec │
+06)│ -------------------- │
+07)│ predicate: │
+08)│ string_col@1 != foo │
+09)└─────────────┬─────────────┘
+10)┌─────────────┴─────────────┐
+11)│ RepartitionExec │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ DataSourceExec │
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: parquet │
+18)│ │
+19)│ predicate: │
+20)│ string_col@1 != foo │
+21)└───────────────────────────┘
+
+# Query with filter on memory
+query TT
+explain SELECT int_col FROM table3 WHERE string_col != 'foo';
+----
+logical_plan
+01)Projection: table3.int_col
+02)--Filter: table3.string_col != Utf8("foo")
+03)----TableScan: table3 projection=[int_col, string_col]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalesceBatchesExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ FilterExec │
+06)│ -------------------- │
+07)│ predicate: │
+08)│ string_col@1 != foo │
+09)└─────────────┬─────────────┘
+10)┌─────────────┴─────────────┐
+11)│ DataSourceExec │
+12)│ -------------------- │
+13)│ bytes: 1560 │
+14)│ format: memory │
+15)│ rows: 1 │
+16)└───────────────────────────┘
+
+# Query with filter on json
+query TT
+explain SELECT int_col FROM table4 WHERE string_col != 'foo';
+----
+logical_plan
+01)Projection: table4.int_col
+02)--Filter: table4.string_col != Utf8("foo")
+03)----TableScan: table4 projection=[int_col, string_col],
partial_filters=[table4.string_col != Utf8("foo")]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalesceBatchesExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ FilterExec │
+06)│ -------------------- │
+07)│ predicate: │
+08)│ string_col@1 != foo │
+09)└─────────────┬─────────────┘
+10)┌─────────────┴─────────────┐
+11)│ RepartitionExec │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ DataSourceExec │
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: json │
+18)└───────────────────────────┘
+
+# Query with filter on arrow
+query TT
+explain SELECT int_col FROM table5 WHERE string_col != 'foo';
+----
+logical_plan
+01)Projection: table5.int_col
+02)--Filter: table5.string_col != Utf8("foo")
+03)----TableScan: table5 projection=[int_col, string_col],
partial_filters=[table5.string_col != Utf8("foo")]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalesceBatchesExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ FilterExec │
+06)│ -------------------- │
+07)│ predicate: │
+08)│ string_col@1 != foo │
+09)└─────────────┬─────────────┘
+10)┌─────────────┴─────────────┐
+11)│ RepartitionExec │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ DataSourceExec │
+15)│ -------------------- │
+16)│ files: 1 │
+17)│ format: arrow │
+18)└───────────────────────────┘
+
# cleanup
statement ok
drop table table1;
@@ -225,3 +408,9 @@ drop table table2;
statement ok
drop table table3;
+
+statement ok
+drop table table4;
+
+statement ok
+drop table table5;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]