This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2093551d84 Fix: CopyTo logical plan outputs 1 column (#16705)
2093551d84 is described below

commit 2093551d841966d1292c3ceef3b98a472d039a11
Author: Bert Vermeiren <[email protected]>
AuthorDate: Tue Jul 8 23:46:28 2025 +0200

    Fix: CopyTo logical plan outputs 1 column (#16705)
    
    Co-authored-by: Bert Vermeiren <[email protected]>
---
 datafusion/core/src/physical_planner.rs            |  1 +
 datafusion/core/tests/dataframe/mod.rs             | 26 ++++++++
 datafusion/expr/src/logical_plan/builder.rs        |  6 +-
 datafusion/expr/src/logical_plan/display.rs        |  1 +
 datafusion/expr/src/logical_plan/dml.rs            | 23 +++++++
 datafusion/expr/src/logical_plan/plan.rs           | 17 ++---
 datafusion/expr/src/logical_plan/tree_node.rs      |  2 +
 datafusion/proto/src/logical_plan/mod.rs           | 12 ++--
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 72 +++++++++++-----------
 datafusion/sql/src/statement.rs                    | 12 ++--
 10 files changed, 113 insertions(+), 59 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 9ec91e6953..293f2cfc96 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -505,6 +505,7 @@ impl DefaultPhysicalPlanner {
                 file_type,
                 partition_by,
                 options: source_option_tuples,
+                output_schema: _,
             }) => {
                 let original_url = output_url.clone();
                 let input_exec = children.one()?;
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 8d60dbea3d..a4c5892d94 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -6137,3 +6137,29 @@ async fn test_dataframe_macro() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_copy_schema() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+
+    let session_state = 
SessionStateBuilder::new_with_default_features().build();
+
+    let session_ctx = SessionContext::new_with_state(session_state);
+
+    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, 
true)]));
+
+    // Create and register the source table with the provided schema and data
+    let source_table = Arc::new(MemTable::try_new(schema.clone(), 
vec![vec![]])?);
+    session_ctx.register_table("source_table", source_table.clone())?;
+
+    let target_path = tmp_dir.path().join("target.csv");
+
+    let query = format!(
+        "COPY source_table TO '{:?}' STORED AS csv",
+        target_path.to_str().unwrap()
+    );
+
+    let result = session_ctx.sql(&query).await?;
+    assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
+    Ok(())
+}
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index f2e8bbd4dd..2ad667e8e9 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -407,13 +407,13 @@ impl LogicalPlanBuilder {
         options: HashMap<String, String>,
         partition_by: Vec<String>,
     ) -> Result<Self> {
-        Ok(Self::new(LogicalPlan::Copy(CopyTo {
-            input: Arc::new(input),
+        Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
+            Arc::new(input),
             output_url,
             partition_by,
             file_type,
             options,
-        })))
+        ))))
     }
 
     /// Create a [`DmlStatement`] for inserting the contents of this builder 
into the named table.
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index f1e455f46d..cc3fbad7b0 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -426,6 +426,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
                 file_type,
                 partition_by: _,
                 options,
+                output_schema: _,
             }) => {
                 let op_str = options
                     .iter()
diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index f3c95e696b..369b91e204 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -40,6 +40,8 @@ pub struct CopyTo {
     pub file_type: Arc<dyn FileType>,
     /// SQL Options that can affect the formats
     pub options: HashMap<String, String>,
+    /// The schema of the output (a single column "count")
+    pub output_schema: DFSchemaRef,
 }
 
 impl Debug for CopyTo {
@@ -50,6 +52,7 @@ impl Debug for CopyTo {
             .field("partition_by", &self.partition_by)
             .field("file_type", &"...")
             .field("options", &self.options)
+            .field("output_schema", &self.output_schema)
             .finish_non_exhaustive()
     }
 }
@@ -89,6 +92,26 @@ impl Hash for CopyTo {
     }
 }
 
+impl CopyTo {
+    pub fn new(
+        input: Arc<LogicalPlan>,
+        output_url: String,
+        partition_by: Vec<String>,
+        file_type: Arc<dyn FileType>,
+        options: HashMap<String, String>,
+    ) -> Self {
+        Self {
+            input,
+            output_url,
+            partition_by,
+            file_type,
+            options,
+            // The output schema is always a single column "count" with the 
number of rows copied
+            output_schema: make_count_schema(),
+        }
+    }
+}
+
 /// Modifies the content of a database
 ///
 /// This operator is used to perform DML operations such as INSERT, DELETE,
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 629f5c332b..f112b7136d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -345,7 +345,7 @@ impl LogicalPlan {
                 output_schema
             }
             LogicalPlan::Dml(DmlStatement { output_schema, .. }) => 
output_schema,
-            LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
+            LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
             LogicalPlan::Ddl(ddl) => ddl.schema(),
             LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
             LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => 
{
@@ -810,16 +810,17 @@ impl LogicalPlan {
                 file_type,
                 options,
                 partition_by,
+                output_schema: _,
             }) => {
                 self.assert_no_expressions(expr)?;
                 let input = self.only_input(inputs)?;
-                Ok(LogicalPlan::Copy(CopyTo {
-                    input: Arc::new(input),
-                    output_url: output_url.clone(),
-                    file_type: Arc::clone(file_type),
-                    options: options.clone(),
-                    partition_by: partition_by.clone(),
-                }))
+                Ok(LogicalPlan::Copy(CopyTo::new(
+                    Arc::new(input),
+                    output_url.clone(),
+                    partition_by.clone(),
+                    Arc::clone(file_type),
+                    options.clone(),
+                )))
             }
             LogicalPlan::Values(Values { schema, .. }) => {
                 self.assert_no_inputs(inputs)?;
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 527248ad39..b38652bdc1 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -243,6 +243,7 @@ impl TreeNode for LogicalPlan {
                 partition_by,
                 file_type,
                 options,
+                output_schema,
             }) => input.map_elements(f)?.update_data(|input| {
                 LogicalPlan::Copy(CopyTo {
                     input,
@@ -250,6 +251,7 @@ impl TreeNode for LogicalPlan {
                     partition_by,
                     file_type,
                     options,
+                    output_schema,
                 })
             }),
             LogicalPlan::Ddl(ddl) => {
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 1acf1ee27b..5f02288992 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -905,13 +905,13 @@ impl AsLogicalPlan for LogicalPlanNode {
                     extension_codec.try_decode_file_format(&copy.file_type, 
ctx)?,
                 );
 
-                Ok(LogicalPlan::Copy(dml::CopyTo {
-                    input: Arc::new(input),
-                    output_url: copy.output_url.clone(),
-                    partition_by: copy.partition_by.clone(),
+                Ok(LogicalPlan::Copy(dml::CopyTo::new(
+                    Arc::new(input),
+                    copy.output_url.clone(),
+                    copy.partition_by.clone(),
                     file_type,
-                    options: Default::default(),
-                }))
+                    Default::default(),
+                )))
             }
             LogicalPlanType::Unnest(unnest) => {
                 let input: LogicalPlan =
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 12c05cfcb5..6c51d553fe 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -429,13 +429,13 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> 
Result<()> {
     let input = create_csv_scan(&ctx).await?;
     let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.csv".to_string(),
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.csv".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     let codec = CsvLogicalExtensionCodec {};
     let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -469,13 +469,13 @@ async fn roundtrip_logical_plan_copy_to_writer_options() 
-> Result<()> {
         ParquetFormatFactory::new_with_options(parquet_format),
     ));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.parquet".to_string(),
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.parquet".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     let codec = ParquetLogicalExtensionCodec {};
     let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -501,13 +501,13 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> 
Result<()> {
 
     let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.arrow".to_string(),
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.arrow".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     let codec = ArrowLogicalExtensionCodec {};
     let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -548,13 +548,13 @@ async fn roundtrip_logical_plan_copy_to_csv() -> 
Result<()> {
         csv_format.clone(),
     )));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.csv".to_string(),
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.csv".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     let codec = CsvLogicalExtensionCodec {};
     let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -614,13 +614,13 @@ async fn roundtrip_logical_plan_copy_to_json() -> 
Result<()> {
         json_format.clone(),
     )));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.json".to_string(),
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.json".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     // Assume JsonLogicalExtensionCodec is implemented similarly to 
CsvLogicalExtensionCodec
     let codec = JsonLogicalExtensionCodec {};
@@ -686,13 +686,13 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> 
Result<()> {
         ParquetFormatFactory::new_with_options(parquet_format.clone()),
     ));
 
-    let plan = LogicalPlan::Copy(CopyTo {
-        input: Arc::new(input),
-        output_url: "test.parquet".to_string(),
-        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
+    let plan = LogicalPlan::Copy(CopyTo::new(
+        Arc::new(input),
+        "test.parquet".to_string(),
+        vec!["a".to_string(), "b".to_string(), "c".to_string()],
         file_type,
-        options: Default::default(),
-    });
+        Default::default(),
+    ));
 
     // Assume ParquetLogicalExtensionCodec is implemented similarly to 
JsonLogicalExtensionCodec
     let codec = ParquetLogicalExtensionCodec {};
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index f83cffe47a..b2bea86f55 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1388,13 +1388,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             .map(|f| f.name().to_owned())
             .collect();
 
-        Ok(LogicalPlan::Copy(CopyTo {
-            input: Arc::new(input),
-            output_url: statement.target,
-            file_type,
+        Ok(LogicalPlan::Copy(CopyTo::new(
+            Arc::new(input),
+            statement.target,
             partition_by,
-            options: options_map,
-        }))
+            file_type,
+            options_map,
+        )))
     }
 
     fn build_order_by(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to