This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 824f1084dc Implement `tree` explain for `ArrowFileSink`, fix original
URL (#15206)
824f1084dc is described below
commit 824f1084dc176f247e5adfa8a82c3b641dd84ae3
Author: irenjj <[email protected]>
AuthorDate: Sat Mar 15 04:17:35 2025 +0800
Implement `tree` explain for `ArrowFileSink`, fix original URL (#15206)
* Implement `tree` explain for `ArrowFileSink`
* fmt original_url
* fix doc test
---
.../core/src/datasource/file_format/arrow.rs | 4 +-
.../core/src/datasource/file_format/parquet.rs | 3 +
datafusion/core/src/datasource/listing/table.rs | 1 +
datafusion/core/src/physical_planner.rs | 2 +
datafusion/datasource-csv/src/file_format.rs | 6 +-
datafusion/datasource-json/src/file_format.rs | 6 +-
datafusion/datasource/src/file_sink_config.rs | 2 +
datafusion/proto/src/physical_plan/from_proto.rs | 1 +
.../proto/tests/cases/roundtrip_physical_plan.rs | 3 +
.../sqllogictest/test_files/explain_tree.slt | 70 +++++++++++++++++-----
10 files changed, 72 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index 6835d9a6da..71d8d00266 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -302,8 +302,8 @@ impl DisplayAs for ArrowFileSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
- // TODO: collect info
- write!(f, "")
+ writeln!(f, "format: arrow")?;
+ write!(f, "file={}", &self.config.original_url)
}
}
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index e2c7d1ecaf..3b71593b33 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1373,6 +1373,7 @@ mod tests {
let object_store_url = ObjectStoreUrl::local_filesystem();
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse(table_path)?],
@@ -1458,6 +1459,7 @@ mod tests {
// set file config to include partitioning on field_a
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
@@ -1541,6 +1543,7 @@ mod tests {
let object_store_url = ObjectStoreUrl::local_filesystem();
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 4d7762784d..21b35bac21 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1037,6 +1037,7 @@ impl TableProvider for ListingTable {
// Sink related option, apart from format
let config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
file_groups,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 6aff9280ff..170f85af7a 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -502,6 +502,7 @@ impl DefaultPhysicalPlanner {
partition_by,
options: source_option_tuples,
}) => {
+ let original_url = output_url.clone();
let input_exec = children.one()?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();
@@ -531,6 +532,7 @@ impl DefaultPhysicalPlanner {
// Set file sink related options
let config = FileSinkConfig {
+ original_url,
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index cc1b63cfc9..522cb12db0 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -666,10 +666,8 @@ impl DisplayAs for CsvSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
- if !self.config.file_groups.is_empty() {
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
- }
- Ok(())
+ writeln!(f, "format: csv")?;
+ write!(f, "file={}", &self.config.original_url)
}
}
}
diff --git a/datafusion/datasource-json/src/file_format.rs
b/datafusion/datasource-json/src/file_format.rs
index bec3a524f6..9b6d5925fe 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -325,10 +325,8 @@ impl DisplayAs for JsonSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
- if !self.config.file_groups.is_empty() {
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
- }
- Ok(())
+ writeln!(f, "format: json")?;
+ write!(f, "file={}", &self.config.original_url)
}
}
}
diff --git a/datafusion/datasource/src/file_sink_config.rs
b/datafusion/datasource/src/file_sink_config.rs
index 6087f930d3..279c9d2100 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -86,6 +86,8 @@ pub trait FileSink: DataSink {
/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
pub struct FileSinkConfig {
+ /// The unresolved URL specified by the user
+ pub original_url: String,
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file
partition
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 6331b7fb31..0bf9fdb63d 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -657,6 +657,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
protobuf::InsertOp::Replace => InsertOp::Replace,
};
Ok(Self {
+ original_url: String::default(),
object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
file_groups,
table_paths,
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index b5bfef99a6..21622390c6 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -1281,6 +1281,7 @@ fn roundtrip_json_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
@@ -1317,6 +1318,7 @@ fn roundtrip_csv_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
@@ -1372,6 +1374,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let file_sink_config = FileSinkConfig {
+ original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt
b/datafusion/sqllogictest/test_files/explain_tree.slt
index 99190122f7..1a1b039658 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -1827,14 +1827,21 @@ TO 'test_files/scratch/explain_tree/1.json';
physical_plan
01)┌───────────────────────────┐
02)│ DataSinkExec │
-03)└─────────────┬─────────────┘
-04)┌─────────────┴─────────────┐
-05)│ DataSourceExec │
-06)│ -------------------- │
-07)│ bytes: 2672 │
-08)│ format: memory │
-09)│ rows: 1 │
-10)└───────────────────────────┘
+03)│ -------------------- │
+04)│ file: │
+05)│ test_files/scratch │
+06)│ /explain_tree/1 │
+07)│ .json │
+08)│ │
+09)│ format: json │
+10)└─────────────┬─────────────┘
+11)┌─────────────┴─────────────┐
+12)│ DataSourceExec │
+13)│ -------------------- │
+14)│ bytes: 2672 │
+15)│ format: memory │
+16)│ rows: 1 │
+17)└───────────────────────────┘
query TT
explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'),
(3, 'baz', 3, '2023-01-03'))
@@ -1843,14 +1850,45 @@ TO 'test_files/scratch/explain_tree/2.csv';
physical_plan
01)┌───────────────────────────┐
02)│ DataSinkExec │
-03)└─────────────┬─────────────┘
-04)┌─────────────┴─────────────┐
-05)│ DataSourceExec │
-06)│ -------------------- │
-07)│ bytes: 2672 │
-08)│ format: memory │
-09)│ rows: 1 │
-10)└───────────────────────────┘
+03)│ -------------------- │
+04)│ file: │
+05)│ test_files/scratch │
+06)│ /explain_tree/2 │
+07)│ .csv │
+08)│ │
+09)│ format: csv │
+10)└─────────────┬─────────────┘
+11)┌─────────────┴─────────────┐
+12)│ DataSourceExec │
+13)│ -------------------- │
+14)│ bytes: 2672 │
+15)│ format: memory │
+16)│ rows: 1 │
+17)└───────────────────────────┘
+
+query TT
+explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'),
(3, 'baz', 3, '2023-01-03'))
+TO 'test_files/scratch/explain_tree/3.arrow';
+----
+physical_plan
+01)┌───────────────────────────┐
+02)│ DataSinkExec │
+03)│ -------------------- │
+04)│ file: │
+05)│ test_files/scratch │
+06)│ /explain_tree/3 │
+07)│ .arrow │
+08)│ │
+09)│ format: arrow │
+10)└─────────────┬─────────────┘
+11)┌─────────────┴─────────────┐
+12)│ DataSourceExec │
+13)│ -------------------- │
+14)│ bytes: 2672 │
+15)│ format: memory │
+16)│ rows: 1 │
+17)└───────────────────────────┘
+
# Test explain tree rendering for CoalesceBatchesExec with limit
statement ok
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]