This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2093551d84 Fix: CopyTo logical plan outputs 1 column (#16705)
2093551d84 is described below
commit 2093551d841966d1292c3ceef3b98a472d039a11
Author: Bert Vermeiren <[email protected]>
AuthorDate: Tue Jul 8 23:46:28 2025 +0200
Fix: CopyTo logical plan outputs 1 column (#16705)
Co-authored-by: Bert Vermeiren <[email protected]>
---
datafusion/core/src/physical_planner.rs | 1 +
datafusion/core/tests/dataframe/mod.rs | 26 ++++++++
datafusion/expr/src/logical_plan/builder.rs | 6 +-
datafusion/expr/src/logical_plan/display.rs | 1 +
datafusion/expr/src/logical_plan/dml.rs | 23 +++++++
datafusion/expr/src/logical_plan/plan.rs | 17 ++---
datafusion/expr/src/logical_plan/tree_node.rs | 2 +
datafusion/proto/src/logical_plan/mod.rs | 12 ++--
.../proto/tests/cases/roundtrip_logical_plan.rs | 72 +++++++++++-----------
datafusion/sql/src/statement.rs | 12 ++--
10 files changed, 113 insertions(+), 59 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 9ec91e6953..293f2cfc96 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -505,6 +505,7 @@ impl DefaultPhysicalPlanner {
file_type,
partition_by,
options: source_option_tuples,
+ output_schema: _,
}) => {
let original_url = output_url.clone();
let input_exec = children.one()?;
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 8d60dbea3d..a4c5892d94 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -6137,3 +6137,29 @@ async fn test_dataframe_macro() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_copy_schema() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+
+ let session_state =
SessionStateBuilder::new_with_default_features().build();
+
+ let session_ctx = SessionContext::new_with_state(session_state);
+
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64,
true)]));
+
+ // Create and register the source table with the provided schema and data
+ let source_table = Arc::new(MemTable::try_new(schema.clone(),
vec![vec![]])?);
+ session_ctx.register_table("source_table", source_table.clone())?;
+
+ let target_path = tmp_dir.path().join("target.csv");
+
+ let query = format!(
+ "COPY source_table TO '{:?}' STORED AS csv",
+ target_path.to_str().unwrap()
+ );
+
+ let result = session_ctx.sql(&query).await?;
+ assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
+ Ok(())
+}
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index f2e8bbd4dd..2ad667e8e9 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -407,13 +407,13 @@ impl LogicalPlanBuilder {
options: HashMap<String, String>,
partition_by: Vec<String>,
) -> Result<Self> {
- Ok(Self::new(LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
+ Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
output_url,
partition_by,
file_type,
options,
- })))
+ ))))
}
/// Create a [`DmlStatement`] for inserting the contents of this builder
into the named table.
diff --git a/datafusion/expr/src/logical_plan/display.rs
b/datafusion/expr/src/logical_plan/display.rs
index f1e455f46d..cc3fbad7b0 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -426,6 +426,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
file_type,
partition_by: _,
options,
+ output_schema: _,
}) => {
let op_str = options
.iter()
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index f3c95e696b..369b91e204 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -40,6 +40,8 @@ pub struct CopyTo {
pub file_type: Arc<dyn FileType>,
/// SQL Options that can affect the formats
pub options: HashMap<String, String>,
+ /// The schema of the output (a single column "count")
+ pub output_schema: DFSchemaRef,
}
impl Debug for CopyTo {
@@ -50,6 +52,7 @@ impl Debug for CopyTo {
.field("partition_by", &self.partition_by)
.field("file_type", &"...")
.field("options", &self.options)
+ .field("output_schema", &self.output_schema)
.finish_non_exhaustive()
}
}
@@ -89,6 +92,26 @@ impl Hash for CopyTo {
}
}
+impl CopyTo {
+ pub fn new(
+ input: Arc<LogicalPlan>,
+ output_url: String,
+ partition_by: Vec<String>,
+ file_type: Arc<dyn FileType>,
+ options: HashMap<String, String>,
+ ) -> Self {
+ Self {
+ input,
+ output_url,
+ partition_by,
+ file_type,
+ options,
+ // The output schema is always a single column "count" with the
number of rows copied
+ output_schema: make_count_schema(),
+ }
+ }
+}
+
/// Modifies the content of a database
///
/// This operator is used to perform DML operations such as INSERT, DELETE,
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 629f5c332b..f112b7136d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -345,7 +345,7 @@ impl LogicalPlan {
output_schema
}
LogicalPlan::Dml(DmlStatement { output_schema, .. }) =>
output_schema,
- LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
+ LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) =>
{
@@ -810,16 +810,17 @@ impl LogicalPlan {
file_type,
options,
partition_by,
+ output_schema: _,
}) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
- Ok(LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: output_url.clone(),
- file_type: Arc::clone(file_type),
- options: options.clone(),
- partition_by: partition_by.clone(),
- }))
+ Ok(LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ output_url.clone(),
+ partition_by.clone(),
+ Arc::clone(file_type),
+ options.clone(),
+ )))
}
LogicalPlan::Values(Values { schema, .. }) => {
self.assert_no_inputs(inputs)?;
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index 527248ad39..b38652bdc1 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -243,6 +243,7 @@ impl TreeNode for LogicalPlan {
partition_by,
file_type,
options,
+ output_schema,
}) => input.map_elements(f)?.update_data(|input| {
LogicalPlan::Copy(CopyTo {
input,
@@ -250,6 +251,7 @@ impl TreeNode for LogicalPlan {
partition_by,
file_type,
options,
+ output_schema,
})
}),
LogicalPlan::Ddl(ddl) => {
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 1acf1ee27b..5f02288992 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -905,13 +905,13 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec.try_decode_file_format(©.file_type,
ctx)?,
);
- Ok(LogicalPlan::Copy(dml::CopyTo {
- input: Arc::new(input),
- output_url: copy.output_url.clone(),
- partition_by: copy.partition_by.clone(),
+ Ok(LogicalPlan::Copy(dml::CopyTo::new(
+ Arc::new(input),
+ copy.output_url.clone(),
+ copy.partition_by.clone(),
file_type,
- options: Default::default(),
- }))
+ Default::default(),
+ )))
}
LogicalPlanType::Unnest(unnest) => {
let input: LogicalPlan =
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 12c05cfcb5..6c51d553fe 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -429,13 +429,13 @@ async fn roundtrip_logical_plan_copy_to_sql_options() ->
Result<()> {
let input = create_csv_scan(&ctx).await?;
let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.csv".to_string(),
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.csv".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- options: Default::default(),
- });
+ Default::default(),
+ ));
let codec = CsvLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -469,13 +469,13 @@ async fn roundtrip_logical_plan_copy_to_writer_options()
-> Result<()> {
ParquetFormatFactory::new_with_options(parquet_format),
));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.parquet".to_string(),
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.parquet".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
- options: Default::default(),
- });
+ Default::default(),
+ ));
let codec = ParquetLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -501,13 +501,13 @@ async fn roundtrip_logical_plan_copy_to_arrow() ->
Result<()> {
let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.arrow".to_string(),
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.arrow".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- options: Default::default(),
- });
+ Default::default(),
+ ));
let codec = ArrowLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -548,13 +548,13 @@ async fn roundtrip_logical_plan_copy_to_csv() ->
Result<()> {
csv_format.clone(),
)));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.csv".to_string(),
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.csv".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- options: Default::default(),
- });
+ Default::default(),
+ ));
let codec = CsvLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -614,13 +614,13 @@ async fn roundtrip_logical_plan_copy_to_json() ->
Result<()> {
json_format.clone(),
)));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.json".to_string(),
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.json".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- options: Default::default(),
- });
+ Default::default(),
+ ));
// Assume JsonLogicalExtensionCodec is implemented similarly to
CsvLogicalExtensionCodec
let codec = JsonLogicalExtensionCodec {};
@@ -686,13 +686,13 @@ async fn roundtrip_logical_plan_copy_to_parquet() ->
Result<()> {
ParquetFormatFactory::new_with_options(parquet_format.clone()),
));
- let plan = LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: "test.parquet".to_string(),
- partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+ let plan = LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ "test.parquet".to_string(),
+ vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
- options: Default::default(),
- });
+ Default::default(),
+ ));
// Assume ParquetLogicalExtensionCodec is implemented similarly to
JsonLogicalExtensionCodec
let codec = ParquetLogicalExtensionCodec {};
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index f83cffe47a..b2bea86f55 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1388,13 +1388,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.map(|f| f.name().to_owned())
.collect();
- Ok(LogicalPlan::Copy(CopyTo {
- input: Arc::new(input),
- output_url: statement.target,
- file_type,
+ Ok(LogicalPlan::Copy(CopyTo::new(
+ Arc::new(input),
+ statement.target,
partition_by,
- options: options_map,
- }))
+ file_type,
+ options_map,
+ )))
}
fn build_order_by(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]