This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 330ece8e43 feat: Conditionally allow to keep partition_by columns when 
using PARTITIONED BY enhancement (#11107)
330ece8e43 is described below

commit 330ece8e435ecb47d3fa14729df3c3f8378526c7
Author: Hector Veiga <[email protected]>
AuthorDate: Fri Jun 28 13:13:48 2024 -0700

    feat: Conditionally allow to keep partition_by columns when using 
PARTITIONED BY enhancement (#11107)
    
    * feat: conditionally allow to keep partition_by columns
    
    * feat: add flag to file sink config, add tests
    
    * this commit contains:
     - separate options by prefix 'hive.'
     - add hive_options to CopyTo struct
     - add more documentation
     - add session execution flag to enable feature, false by default
    
    * do not add hive_options to CopyTo
    
    * npx prettier
    
    * fmt
    
    * change prefix to execution. , update override order for condition.
    
    * improve handling of flag, added test for config error
    
    * trying to make CI happier
    
    * prettier
    
    * Update test
    
    * update doc
    
    ---------
    
    Co-authored-by: Héctor Veiga Ortiz <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/common/src/config.rs                     |  6 ++++++
 datafusion/core/src/datasource/file_format/arrow.rs |  1 +
 .../core/src/datasource/file_format/parquet.rs      |  7 ++++++-
 .../core/src/datasource/file_format/write/demux.rs  | 13 +++++++++----
 .../datasource/file_format/write/orchestration.rs   |  1 +
 datafusion/core/src/datasource/listing/table.rs     |  3 +++
 datafusion/core/src/datasource/physical_plan/mod.rs |  2 ++
 datafusion/core/src/physical_planner.rs             | 11 +++++++++++
 datafusion/expr/src/logical_plan/builder.rs         |  2 +-
 datafusion/proto/proto/datafusion.proto             |  1 +
 datafusion/proto/src/generated/pbjson.rs            | 18 ++++++++++++++++++
 datafusion/proto/src/generated/prost.rs             |  2 ++
 datafusion/proto/src/physical_plan/from_proto.rs    |  1 +
 datafusion/proto/src/physical_plan/to_proto.rs      |  1 +
 .../proto/tests/cases/roundtrip_physical_plan.rs    |  3 +++
 datafusion/sql/src/parser.rs                        |  6 +++++-
 datafusion/sql/src/statement.rs                     |  1 +
 datafusion/sqllogictest/test_files/copy.slt         | 21 +++++++++++++++++++++
 .../sqllogictest/test_files/information_schema.slt  |  2 ++
 docs/source/user-guide/configs.md                   |  1 +
 docs/source/user-guide/sql/dml.md                   |  5 ++++-
 docs/source/user-guide/sql/write_options.md         | 10 ++++++++++
 22 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 7e3871e6b7..1ecdb0efd2 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -309,6 +309,8 @@ config_namespace! {
         /// Currently experimental
         pub split_file_groups_by_statistics: bool, default = false
 
+        /// Should Datafusion keep the columns used for partition_by in the 
output RecordBatches
+        pub keep_partition_by_columns: bool, default = false
     }
 }
 
@@ -1294,6 +1296,10 @@ impl TableOptions {
             return ConfigField::set(self, key, value);
         }
 
+        if prefix == "execution" {
+            return Ok(());
+        }
+
         let Some(e) = self.extensions.0.get_mut(prefix) else {
             return _config_err!("Could not find config namespace 
\"{prefix}\"");
         };
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 478a11d7e7..9a3aa2454e 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -277,6 +277,7 @@ impl DataSink for ArrowFileSink {
             part_col,
             self.config.table_paths[0].clone(),
             "arrow".into(),
+            self.config.keep_partition_by_columns,
         );
 
         let mut file_write_tasks: JoinSet<std::result::Result<usize, 
DataFusionError>> =
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 44c9cc4ec4..89e69caffc 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -626,7 +626,9 @@ impl ParquetSink {
     /// of hive style partitioning where some columns are removed from the
     /// underlying files.
     fn get_writer_schema(&self) -> Arc<Schema> {
-        if !self.config.table_partition_cols.is_empty() {
+        if !self.config.table_partition_cols.is_empty()
+            && !self.config.keep_partition_by_columns
+        {
             let schema = self.config.output_schema();
             let partition_names: Vec<_> = self
                 .config
@@ -716,6 +718,7 @@ impl DataSink for ParquetSink {
             part_col,
             self.config.table_paths[0].clone(),
             "parquet".into(),
+            self.config.keep_partition_by_columns,
         );
 
         let mut file_write_tasks: JoinSet<
@@ -1953,6 +1956,7 @@ mod tests {
             output_schema: schema.clone(),
             table_partition_cols: vec![],
             overwrite: true,
+            keep_partition_by_columns: false,
         };
         let parquet_sink = Arc::new(ParquetSink::new(
             file_sink_config,
@@ -2047,6 +2051,7 @@ mod tests {
             output_schema: schema.clone(),
             table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // 
add partitioning
             overwrite: true,
+            keep_partition_by_columns: false,
         };
         let parquet_sink = Arc::new(ParquetSink::new(
             file_sink_config,
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs 
b/datafusion/core/src/datasource/file_format/write/demux.rs
index d82c2471c5..e29c877442 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -75,6 +75,7 @@ pub(crate) fn start_demuxer_task(
     partition_by: Option<Vec<(String, DataType)>>,
     base_output_path: ListingTableUrl,
     file_extension: String,
+    keep_partition_by_columns: bool,
 ) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
     let (tx, rx) = mpsc::unbounded_channel();
     let context = context.clone();
@@ -91,6 +92,7 @@ pub(crate) fn start_demuxer_task(
                     parts,
                     base_output_path,
                     file_extension,
+                    keep_partition_by_columns,
                 )
                 .await
             })
@@ -111,7 +113,7 @@ pub(crate) fn start_demuxer_task(
     (task, rx)
 }
 
-/// Dynamically partitions input stream to acheive desired maximum rows per 
file
+/// Dynamically partitions input stream to achieve desired maximum rows per 
file
 async fn row_count_demuxer(
     mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
     mut input: SendableRecordBatchStream,
@@ -240,6 +242,7 @@ async fn hive_style_partitions_demuxer(
     partition_by: Vec<(String, DataType)>,
     base_output_path: ListingTableUrl,
     file_extension: String,
+    keep_partition_by_columns: bool,
 ) -> Result<()> {
     let write_id =
         rand::distributions::Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
@@ -298,9 +301,11 @@ async fn hive_style_partitions_demuxer(
                 }
             };
 
-            // remove partitions columns
-            let final_batch_to_send =
-                remove_partition_by_columns(&parted_batch, &partition_by)?;
+            let final_batch_to_send = if keep_partition_by_columns {
+                parted_batch
+            } else {
+                remove_partition_by_columns(&parted_batch, &partition_by)?
+            };
 
             // Finally send the partial batch partitioned by distinct value!
             part_tx.send(final_batch_to_send).await.map_err(|_| {
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 3ae2122de8..a62b5715ae 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -224,6 +224,7 @@ pub(crate) async fn stateless_multipart_put(
         part_cols,
         base_output_path.clone(),
         file_extension,
+        config.keep_partition_by_columns,
     );
 
     let rb_buffer_size = &context
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index b2e81b87fa..ea4d396a14 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -889,6 +889,8 @@ impl TableProvider for ListingTable {
         .await?;
 
         let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
+        let keep_partition_by_columns =
+            state.config().options().execution.keep_partition_by_columns;
 
         // Sink related option, apart from format
         let config = FileSinkConfig {
@@ -898,6 +900,7 @@ impl TableProvider for ListingTable {
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
             overwrite,
+            keep_partition_by_columns,
         };
 
         let unsorted: Vec<Vec<Expr>> = vec![];
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 720e29e355..a897895246 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -85,6 +85,8 @@ pub struct FileSinkConfig {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// Controls whether existing data should be overwritten by this sink
     pub overwrite: bool,
+    /// Controls whether partition columns are kept for the file
+    pub keep_partition_by_columns: bool,
 }
 
 impl FileSinkConfig {
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 5b8501baaa..9dfc62f3a7 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -775,6 +775,16 @@ impl DefaultPhysicalPlanner {
                     .map(|s| (s.to_string(), arrow_schema::DataType::Null))
                     .collect::<Vec<_>>();
 
+                let keep_partition_by_columns = match source_option_tuples
+                    .get("execution.keep_partition_by_columns")
+                    .map(|v| v.trim()) {
+                    None => 
session_state.config().options().execution.keep_partition_by_columns,
+                    Some("true") => true,
+                    Some("false") => false,
+                    Some(value) =>
+                        return 
Err(DataFusionError::Configuration(format!("provided value for 
'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
+                };
+
                 // Set file sink related options
                 let config = FileSinkConfig {
                     object_store_url,
@@ -783,6 +793,7 @@ impl DefaultPhysicalPlanner {
                     output_schema: Arc::new(schema),
                     table_partition_cols,
                     overwrite: false,
+                    keep_partition_by_columns,
                 };
 
                 let sink_format = file_type_to_format(file_type)?
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index f87151efd8..11a8112650 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -279,9 +279,9 @@ impl LogicalPlanBuilder {
         Ok(Self::from(LogicalPlan::Copy(CopyTo {
             input: Arc::new(input),
             output_url,
+            partition_by,
             file_type,
             options,
-            partition_by,
         })))
     }
 
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index f2594ba103..004d7320e2 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -754,6 +754,7 @@ message FileSinkConfig {
   datafusion_common.Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
   bool overwrite = 8;
+  bool keep_partition_by_columns = 9;
 }
 
 message JsonSink {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index e8fbe95442..ebfa783f85 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -5562,6 +5562,9 @@ impl serde::Serialize for FileSinkConfig {
         if self.overwrite {
             len += 1;
         }
+        if self.keep_partition_by_columns {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.FileSinkConfig", len)?;
         if !self.object_store_url.is_empty() {
             struct_ser.serialize_field("objectStoreUrl", 
&self.object_store_url)?;
@@ -5581,6 +5584,9 @@ impl serde::Serialize for FileSinkConfig {
         if self.overwrite {
             struct_ser.serialize_field("overwrite", &self.overwrite)?;
         }
+        if self.keep_partition_by_columns {
+            struct_ser.serialize_field("keepPartitionByColumns", 
&self.keep_partition_by_columns)?;
+        }
         struct_ser.end()
     }
 }
@@ -5602,6 +5608,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             "table_partition_cols",
             "tablePartitionCols",
             "overwrite",
+            "keep_partition_by_columns",
+            "keepPartitionByColumns",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -5612,6 +5620,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             OutputSchema,
             TablePartitionCols,
             Overwrite,
+            KeepPartitionByColumns,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -5639,6 +5648,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             "outputSchema" | "output_schema" => 
Ok(GeneratedField::OutputSchema),
                             "tablePartitionCols" | "table_partition_cols" => 
Ok(GeneratedField::TablePartitionCols),
                             "overwrite" => Ok(GeneratedField::Overwrite),
+                            "keepPartitionByColumns" | 
"keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -5664,6 +5674,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                 let mut output_schema__ = None;
                 let mut table_partition_cols__ = None;
                 let mut overwrite__ = None;
+                let mut keep_partition_by_columns__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::ObjectStoreUrl => {
@@ -5702,6 +5713,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             }
                             overwrite__ = Some(map_.next_value()?);
                         }
+                        GeneratedField::KeepPartitionByColumns => {
+                            if keep_partition_by_columns__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("keepPartitionByColumns"));
+                            }
+                            keep_partition_by_columns__ = 
Some(map_.next_value()?);
+                        }
                     }
                 }
                 Ok(FileSinkConfig {
@@ -5711,6 +5728,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                     output_schema: output_schema__,
                     table_partition_cols: 
table_partition_cols__.unwrap_or_default(),
                     overwrite: overwrite__.unwrap_or_default(),
+                    keep_partition_by_columns: 
keep_partition_by_columns__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 93bf6c0602..1a3514dbd4 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1128,6 +1128,8 @@ pub struct FileSinkConfig {
     pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
     #[prost(bool, tag = "8")]
     pub overwrite: bool,
+    #[prost(bool, tag = "9")]
+    pub keep_partition_by_columns: bool,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 7783c15611..e94bb3b8ef 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -649,6 +649,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
             output_schema: Arc::new(convert_required!(conf.output_schema)?),
             table_partition_cols,
             overwrite: conf.overwrite,
+            keep_partition_by_columns: conf.keep_partition_by_columns,
         })
     }
 }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 8583900e9f..3753612619 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -724,6 +724,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
             output_schema: Some(conf.output_schema.as_ref().try_into()?),
             table_partition_cols,
             overwrite: conf.overwrite,
+            keep_partition_by_columns: conf.keep_partition_by_columns,
         })
     }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 03c72cfc32..106247b2d4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -899,6 +899,7 @@ fn roundtrip_json_sink() -> Result<()> {
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
         overwrite: true,
+        keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(JsonSink::new(
         file_sink_config,
@@ -934,6 +935,7 @@ fn roundtrip_csv_sink() -> Result<()> {
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
         overwrite: true,
+        keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(CsvSink::new(
         file_sink_config,
@@ -992,6 +994,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
         overwrite: true,
+        keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(ParquetSink::new(
         file_sink_config,
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index d2f3f508a3..5da7f71765 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -1475,7 +1475,7 @@ mod tests {
     fn copy_to_multi_options() -> Result<(), ParserError> {
         // order of options is preserved
         let sql =
-            "COPY foo TO bar STORED AS parquet OPTIONS 
('format.row_group_size' 55, 'format.compression' snappy)";
+            "COPY foo TO bar STORED AS parquet OPTIONS 
('format.row_group_size' 55, 'format.compression' snappy, 
'execution.keep_partition_by_columns' true)";
 
         let expected_options = vec![
             (
@@ -1486,6 +1486,10 @@ mod tests {
                 "format.compression".to_string(),
                 Value::SingleQuotedString("snappy".to_string()),
             ),
+            (
+                "execution.keep_partition_by_columns".to_string(),
+                Value::SingleQuotedString("true".to_string()),
+            ),
         ];
 
         let mut statements = DFParser::parse_sql(sql).unwrap();
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 518972545a..6cdb2f959c 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -888,6 +888,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 }
                 Some(v) => v,
             };
+
             if !(&key.contains('.')) {
                 // If config does not belong to any namespace, assume it is
                 // a format option and apply the format prefix for backwards
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 00bcea7ec1..21c34bc25c 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -166,6 +166,23 @@ physical_plan
 01)DataSinkExec: sink=ParquetSink(file_groups=[])
 02)--MemoryExec: partitions=1, partition_sizes=[1]
 
+# Copy to directory as partitioned files with keep_partition_by_columns enabled
+query TT
+COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 
'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY 
(column1)
+OPTIONS (execution.keep_partition_by_columns true);
+----
+3
+
+# validate generated file contains tables
+statement ok
+CREATE EXTERNAL TABLE validate_partitioned_parquet4 STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/partitioned_table4/column1=1/*.parquet';
+
+query TT
+select column1, column2 from validate_partitioned_parquet4 order by 
column1,column2;
+----
+1 a
+
 # Copy more files to directory via query
 query IT
 COPY (select * from source_table UNION ALL select * from source_table) to 
'test_files/scratch/copy/table/' STORED AS PARQUET;
@@ -589,3 +606,7 @@ COPY (select col2, sum(col1) from source_table
 # Copy from table with non literal
 query error DataFusion error: SQL error: ParserError\("Unexpected token \("\)
 COPY source_table  to '/tmp/table.parquet' (row_group_size 55 + 102);
+
+# Copy using execution.keep_partition_by_columns with an invalid value
+query error DataFusion error: Invalid or Unsupported Configuration: provided 
value for 'execution.keep_partition_by_columns' was not recognized: 
"invalid_value"
+COPY source_table  to '/tmp/table.parquet' OPTIONS 
(execution.keep_partition_by_columns invalid_value);
\ No newline at end of file
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index 3cc837aa8e..ee64f77291 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -173,6 +173,7 @@ datafusion.execution.batch_size 8192
 datafusion.execution.coalesce_batches true
 datafusion.execution.collect_statistics false
 datafusion.execution.enable_recursive_ctes true
+datafusion.execution.keep_partition_by_columns false
 datafusion.execution.listing_table_ignore_subdirectory true
 datafusion.execution.max_buffered_batches_per_output_file 2
 datafusion.execution.meta_fetch_concurrency 32
@@ -255,6 +256,7 @@ datafusion.execution.batch_size 8192 Default batch size 
while creating new batch
 datafusion.execution.coalesce_batches true When set to true, record batches 
will be examined between each operator and small batches will be coalesced into 
larger batches. This is helpful when there are highly selective filters or 
joins that could produce tiny output batches. The target batch size is 
determined by the configuration setting
 datafusion.execution.collect_statistics false Should DataFusion collect 
statistics after listing files
 datafusion.execution.enable_recursive_ctes true Should DataFusion support 
recursive CTEs
+datafusion.execution.keep_partition_by_columns false Should Datafusion keep 
the columns used for partition_by in the output RecordBatches
 datafusion.execution.listing_table_ignore_subdirectory true Should sub 
directories be ignored when scanning directories for data files. Defaults to 
true (ignores subdirectories), consistent with Hive. Note that this setting 
does not affect reading partitioned tables (e.g. 
`/table/year=2021/month=01/data.parquet`).
 datafusion.execution.max_buffered_batches_per_output_file 2 This is the 
maximum number of RecordBatches buffered for each output file being worked. 
Higher values can potentially give faster write performance at the cost of 
higher peak memory consumption
 datafusion.execution.meta_fetch_concurrency 32 Number of files to read in 
parallel when inferring schema and statistics
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index c5f22725e0..0f0aa84604 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -86,6 +86,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.listing_table_ignore_subdirectory                  | 
true                      | Should sub directories be ignored when scanning 
directories for data files. Defaults to true (ignores subdirectories), 
consistent with Hive. Note that this setting does not affect reading 
partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).             
                                                                                
                                         [...]
 | datafusion.execution.enable_recursive_ctes                              | 
true                      | Should DataFusion support recursive CTEs            
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 | datafusion.execution.split_file_groups_by_statistics                    | 
false                     | Attempt to eliminate sorts by packing & sorting 
files with non-overlapping statistics into the same file groups. Currently 
experimental                                                                    
                                                                                
                                                                                
                          [...]
+| datafusion.execution.keep_partition_by_columns                          | 
false                     | Should Datafusion keep the columns used for 
partition_by in the output RecordBatches                                        
                                                                                
                                                                                
                                                                                
                         [...]
 | datafusion.optimizer.enable_distinct_aggregation_soft_limit             | 
true                      | When set to true, the optimizer will push a limit 
operation into grouped aggregations which have no aggregate expressions, as a 
soft limit, emitting groups once the limit is reached, before all rows in the 
group are read.                                                                 
                                                                                
                       [...]
 | datafusion.optimizer.enable_round_robin_repartition                     | 
true                      | When set to true, the physical plan optimizer will 
try to add round robin repartitioning to increase parallelism to leverage more 
CPU cores                                                                       
                                                                                
                                                                                
                   [...]
 | datafusion.optimizer.enable_topk_aggregation                            | 
true                      | When set to true, the optimizer will attempt to 
perform limit operations during aggregations, if possible                       
                                                                                
                                                                                
                                                                                
                     [...]
diff --git a/docs/source/user-guide/sql/dml.md 
b/docs/source/user-guide/sql/dml.md
index 42e0c8054c..dd016cabbf 100644
--- a/docs/source/user-guide/sql/dml.md
+++ b/docs/source/user-guide/sql/dml.md
@@ -39,7 +39,10 @@ TO '<i><b>file_name</i></b>'
 clause is not specified, it will be inferred from the file extension if 
possible.
 
 `PARTITIONED BY` specifies the columns to use for partitioning the output 
files into
-separate hive-style directories.
+separate hive-style directories. By default, columns used in `PARTITIONED BY` 
will be removed
+from the output format. If you want to keep the columns, you should provide 
the option
+`execution.keep_partition_by_columns true`. 
`execution.keep_partition_by_columns` flag can also
+be enabled through `ExecutionOptions` within `SessionConfig`.
 
 The output format is determined by the first match of the following rules:
 
diff --git a/docs/source/user-guide/sql/write_options.md 
b/docs/source/user-guide/sql/write_options.md
index 3c4790dd02..6fb4ef215f 100644
--- a/docs/source/user-guide/sql/write_options.md
+++ b/docs/source/user-guide/sql/write_options.md
@@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out 
to a folder of parq
 
 ## Available Options
 
+### Execution Specific Options
+
+The following options are available when executing a `COPY` query.
+
+| Option                              | Description                            
                                            | Default Value |
+| ----------------------------------- | 
----------------------------------------------------------------------------------
 | ------------- |
+| execution.keep_partition_by_columns | Flag to retain the columns in the 
output data when using `PARTITIONED BY` queries. | false         |
+
+Note: `execution.keep_partition_by_columns` flag can also be enabled through 
`ExecutionOptions` within `SessionConfig`.
+
 ### JSON Format Specific Options
 
 The following options are available when writing JSON files. Note: If any 
unsupported option is specified, an error will be raised and the query will 
fail.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to