This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 87eb126eba Support Write Options in DataFrame::write_* methods (#7435)
87eb126eba is described below
commit 87eb126ebaee0f832a61a73a0f601a1c278b16b1
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Sep 6 09:16:50 2023 -0400
Support Write Options in DataFrame::write_* methods (#7435)
* merge main, squash
* remove not existing import
* add test
* fix pathing for windows
* test all supported compression codecs
* update comment
---
datafusion-examples/examples/dataframe-to-s3.rs | 12 +-
datafusion/common/src/file_options/csv_writer.rs | 13 ++
datafusion/common/src/file_options/json_writer.rs | 6 +
.../common/src/file_options/parquet_writer.rs | 8 +-
datafusion/core/src/dataframe.rs | 155 ++++++++++++++++++---
.../core/src/datasource/physical_plan/csv.rs | 6 +-
.../core/src/datasource/physical_plan/json.rs | 6 +-
.../core/src/datasource/physical_plan/parquet.rs | 6 +-
datafusion/core/src/physical_planner.rs | 17 ++-
datafusion/expr/src/logical_plan/builder.rs | 7 +-
datafusion/expr/src/logical_plan/dml.rs | 45 +++++-
datafusion/expr/src/logical_plan/plan.rs | 20 +--
datafusion/expr/src/utils.rs | 4 +-
datafusion/sql/src/statement.rs | 6 +-
14 files changed, 261 insertions(+), 50 deletions(-)
diff --git a/datafusion-examples/examples/dataframe-to-s3.rs
b/datafusion-examples/examples/dataframe-to-s3.rs
index 0d8d0a9fc1..883da7d0d1 100644
--- a/datafusion-examples/examples/dataframe-to-s3.rs
+++ b/datafusion-examples/examples/dataframe-to-s3.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
@@ -61,15 +62,20 @@ async fn main() -> Result<()> {
let df = ctx.sql("SELECT * from test").await?;
let out_path = format!("s3://{bucket_name}/test_write/");
- df.clone().write_parquet(&out_path, None).await?;
+ df.clone()
+ .write_parquet(&out_path, DataFrameWriteOptions::new(), None)
+ .await?;
//write as JSON to s3
let json_out = format!("s3://{bucket_name}/json_out");
- df.clone().write_json(&json_out).await?;
+ df.clone()
+ .write_json(&json_out, DataFrameWriteOptions::new())
+ .await?;
//write as csv to s3
let csv_out = format!("s3://{bucket_name}/csv_out");
- df.write_csv(&csv_out).await?;
+ df.write_csv(&csv_out, DataFrameWriteOptions::new(), None)
+ .await?;
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
diff --git a/datafusion/common/src/file_options/csv_writer.rs
b/datafusion/common/src/file_options/csv_writer.rs
index ebf177fdce..336180b256 100644
--- a/datafusion/common/src/file_options/csv_writer.rs
+++ b/datafusion/common/src/file_options/csv_writer.rs
@@ -46,6 +46,19 @@ pub struct CsvWriterOptions {
// https://github.com/apache/arrow-rs/issues/4735
}
+impl CsvWriterOptions {
+ pub fn new(
+ writer_options: WriterBuilder,
+ compression: CompressionTypeVariant,
+ ) -> Self {
+ Self {
+ writer_options,
+ compression,
+ has_header: true,
+ }
+ }
+}
+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
type Error = DataFusionError;
diff --git a/datafusion/common/src/file_options/json_writer.rs
b/datafusion/common/src/file_options/json_writer.rs
index b3ea76b651..7f988016c6 100644
--- a/datafusion/common/src/file_options/json_writer.rs
+++ b/datafusion/common/src/file_options/json_writer.rs
@@ -33,6 +33,12 @@ pub struct JsonWriterOptions {
pub compression: CompressionTypeVariant,
}
+impl JsonWriterOptions {
+ pub fn new(compression: CompressionTypeVariant) -> Self {
+ Self { compression }
+ }
+}
+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions {
type Error = DataFusionError;
diff --git a/datafusion/common/src/file_options/parquet_writer.rs
b/datafusion/common/src/file_options/parquet_writer.rs
index ea3276b062..fed773f29e 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -36,6 +36,12 @@ pub struct ParquetWriterOptions {
pub writer_options: WriterProperties,
}
+impl ParquetWriterOptions {
+ pub fn new(writer_options: WriterProperties) -> Self {
+ Self { writer_options }
+ }
+}
+
impl ParquetWriterOptions {
pub fn writer_options(&self) -> &WriterProperties {
&self.writer_options
@@ -44,7 +50,7 @@ impl ParquetWriterOptions {
/// Constructs a default Parquet WriterPropertiesBuilder using
/// Session level ConfigOptions to initialize settings
-fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder>
{
+pub fn default_builder(options: &ConfigOptions) ->
Result<WriterPropertiesBuilder> {
let parquet_session_options = &options.execution.parquet;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index fbd78ec9d7..648b2340d4 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -22,10 +22,19 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
+use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
-use datafusion_common::file_options::StatementOptions;
-use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions};
+use datafusion_common::file_options::csv_writer::CsvWriterOptions;
+use datafusion_common::file_options::json_writer::JsonWriterOptions;
+use datafusion_common::file_options::parquet_writer::{
+ default_builder, ParquetWriterOptions,
+};
+use datafusion_common::parsers::CompressionTypeVariant;
+use datafusion_common::{
+ DataFusionError, FileType, FileTypeWriterOptions, SchemaError,
UnnestOptions,
+};
+use datafusion_expr::dml::CopyOptions;
use parquet::file::properties::WriterProperties;
use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -57,21 +66,42 @@ use crate::prelude::SessionContext;
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
- overwrite: bool, // TODO, enable DataFrame COPY TO write without
TableProvider
- // settings such as LOCATION and FILETYPE can be set here
- // e.g. add location: Option<Path>
+ overwrite: bool,
+ /// Controls if all partitions should be coalesced into a single output
file
+ /// Generally will have slower performance when set to true.
+ single_file_output: bool,
+ /// Sets compression by DataFusion applied after file serialization.
+ /// Allows compression of CSV and JSON.
+ /// Not supported for parquet.
+ compression: CompressionTypeVariant,
}
impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
- DataFrameWriteOptions { overwrite: false }
+ DataFrameWriteOptions {
+ overwrite: false,
+ single_file_output: false,
+ compression: CompressionTypeVariant::UNCOMPRESSED,
+ }
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
+
+ /// Set the single_file_output value to true or false
+ pub fn with_single_file_output(mut self, single_file_output: bool) -> Self
{
+ self.single_file_output = single_file_output;
+ self
+ }
+
+ /// Sets the compression type applied to the output file(s)
+ pub fn with_compression(mut self, compression: CompressionTypeVariant) ->
Self {
+ self.compression = compression;
+ self
+ }
}
impl Default for DataFrameWriteOptions {
@@ -995,14 +1025,29 @@ impl DataFrame {
pub async fn write_csv(
self,
path: &str,
+ options: DataFrameWriteOptions,
+ writer_properties: Option<WriterBuilder>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
+ if options.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented for
DataFrame::write_csv.".to_owned(),
+ ));
+ }
+ let props = match writer_properties {
+ Some(props) => props,
+ None => WriterBuilder::new(),
+ };
+
+ let file_type_writer_options =
+ FileTypeWriterOptions::CSV(CsvWriterOptions::new(props,
options.compression));
+ let copy_options =
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
+
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::CSV,
- false,
- // TODO implement options
- StatementOptions::new(vec![]),
+ options.single_file_output,
+ copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
@@ -1012,15 +1057,31 @@ impl DataFrame {
pub async fn write_parquet(
self,
path: &str,
- _writer_properties: Option<WriterProperties>,
+ options: DataFrameWriteOptions,
+ writer_properties: Option<WriterProperties>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
+ if options.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented for
DataFrame::write_parquet.".to_owned(),
+ ));
+ }
+ match options.compression{
+ CompressionTypeVariant::UNCOMPRESSED => (),
+ _ => return
Err(DataFusionError::Configuration("DataFrame::write_parquet method does not
support compression set via DataFrameWriteOptions. Set parquet compression via
writer_properties instead.".to_owned()))
+ }
+ let props = match writer_properties {
+ Some(props) => props,
+ None =>
default_builder(self.session_state.config_options())?.build(),
+ };
+ let file_type_writer_options =
+ FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props));
+ let copy_options =
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::PARQUET,
- false,
- // TODO implement options
- StatementOptions::new(vec![]),
+ options.single_file_output,
+ copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
@@ -1030,14 +1091,22 @@ impl DataFrame {
pub async fn write_json(
self,
path: &str,
+ options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
+ if options.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented for
DataFrame::write_json.".to_owned(),
+ ));
+ }
+ let file_type_writer_options =
+
FileTypeWriterOptions::JSON(JsonWriterOptions::new(options.compression));
+ let copy_options =
CopyOptions::WriterOptions(Box::new(file_type_writer_options));
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::JSON,
- false,
- // TODO implement options
- StatementOptions::new(vec![]),
+ options.single_file_output,
+ copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
@@ -1249,6 +1318,11 @@ mod tests {
WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
+ use object_store::local::LocalFileSystem;
+ use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+ use parquet::file::reader::FileReader;
+ use tempfile::TempDir;
+ use url::Url;
use crate::execution::context::SessionConfig;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
@@ -2292,4 +2366,53 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn write_parquet_with_compression() -> Result<()> {
+ let test_df = test_table().await?;
+
+ let output_path = "file://local/test.parquet";
+ let test_compressions = vec![
+ parquet::basic::Compression::SNAPPY,
+ parquet::basic::Compression::LZ4,
+ parquet::basic::Compression::LZ4_RAW,
+ parquet::basic::Compression::GZIP(GzipLevel::default()),
+ parquet::basic::Compression::BROTLI(BrotliLevel::default()),
+ parquet::basic::Compression::ZSTD(ZstdLevel::default()),
+ ];
+ for compression in test_compressions.into_iter() {
+ let df = test_df.clone();
+ let tmp_dir = TempDir::new()?;
+ let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+ let local_url = Url::parse("file://local").unwrap();
+ let ctx = &test_df.session_state;
+ ctx.runtime_env().register_object_store(&local_url, local);
+ df.write_parquet(
+ output_path,
+ DataFrameWriteOptions::new().with_single_file_output(true),
+ Some(
+ WriterProperties::builder()
+ .set_compression(compression)
+ .build(),
+ ),
+ )
+ .await?;
+
+ // Check that file actually used the specified compression
+ let file =
std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
+
+ let reader =
+
parquet::file::serialized_reader::SerializedFileReader::new(file)
+ .unwrap();
+
+ let parquet_metadata = reader.metadata();
+
+ let written_compression =
+ parquet_metadata.row_group(0).column(0).compression();
+
+ assert_eq!(written_compression, compression);
+ }
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 9f670431cb..e692810381 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -577,6 +577,7 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::dataframe::DataFrameWriteOptions;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
@@ -1071,7 +1072,7 @@ mod tests {
let out_dir_url = "file://local/out";
let e = df
- .write_csv(out_dir_url)
+ .write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match
inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for
column 0 at line 4", format!("{e}"));
@@ -1106,7 +1107,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
- df.write_csv(out_dir_url).await?;
+ df.write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
+ .await?;
// create a new context and verify that the results were saved to a
partitioned csv file
let ctx = SessionContext::new();
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 4ad268a976..ec5a4c1cef 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -321,6 +321,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use crate::assert_batches_eq;
+ use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
@@ -696,7 +697,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT a, b FROM test").await?;
- df.write_json(out_dir_url).await?;
+ df.write_json(out_dir_url, DataFrameWriteOptions::new())
+ .await?;
// create a new context and verify that the results were saved to a
partitioned csv file
let ctx = SessionContext::new();
@@ -789,7 +791,7 @@ mod tests {
let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
let out_dir_url = "file://local/out";
let e = df
- .write_json(out_dir_url)
+ .write_json(out_dir_url, DataFrameWriteOptions::new())
.await
.expect_err("should fail because input file does not match
inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for
column 0 at line 4", format!("{e}"));
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index d973f13fad..861a37a302 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -736,6 +736,7 @@ mod tests {
// See also `parquet_exec` integration test
use super::*;
+ use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
@@ -932,7 +933,7 @@ mod tests {
let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
let out_dir_url = "file://local/out";
let e = df
- .write_parquet(out_dir_url, None)
+ .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match
inferred schema");
assert_eq!("Arrow error: Parser error: Error while parsing value d for
column 0 at line 4", format!("{e}"));
@@ -1951,7 +1952,8 @@ mod tests {
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
- df.write_parquet(out_dir_url, None).await?;
+ df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
+ .await?;
// write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir,
None).await?;
// create a new context and verify that the results were saved to a
partitioned parquet file
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 25b11b1f97..3a59f40ede 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -40,7 +40,7 @@ use crate::logical_expr::{
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::FileTypeWriterOptions;
use datafusion_common::FileType;
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyOptions, CopyTo};
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
@@ -557,7 +557,7 @@ impl DefaultPhysicalPlanner {
output_url,
file_format,
single_file_output,
- statement_options,
+ copy_options,
}) => {
let input_exec = self.create_initial_plan(input,
session_state).await?;
@@ -569,10 +569,15 @@ impl DefaultPhysicalPlanner {
let schema: Schema = (**input.schema()).clone().into();
- let file_type_writer_options =
FileTypeWriterOptions::build(
- file_format,
- session_state.config_options(),
- statement_options)?;
+ let file_type_writer_options = match copy_options{
+ CopyOptions::SQLOptions(statement_options) => {
+ FileTypeWriterOptions::build(
+ file_format,
+ session_state.config_options(),
+ statement_options)?
+ },
+ CopyOptions::WriterOptions(writer_options) =>
*writer_options.clone()
+ };
// Set file sink related options
let config = FileSinkConfig {
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index d3cc42afbe..eb7833504b 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,7 +17,7 @@
//! This module provides a builder for creating LogicalPlans
-use crate::dml::CopyTo;
+use crate::dml::{CopyOptions, CopyTo};
use crate::expr::Alias;
use crate::expr_rewriter::{
coerce_plan_expr_for_schema, normalize_col,
@@ -41,7 +41,6 @@ use crate::{
Expr, ExprSchemable, TableSource,
};
use arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion_common::file_options::StatementOptions;
use datafusion_common::plan_err;
use datafusion_common::UnnestOptions;
use datafusion_common::{
@@ -240,14 +239,14 @@ impl LogicalPlanBuilder {
output_url: String,
file_format: FileType,
single_file_output: bool,
- statement_options: StatementOptions,
+ copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
single_file_output,
- statement_options,
+ copy_options,
})))
}
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index 501f2eeba1..4cd56b89ac 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -21,7 +21,8 @@ use std::{
};
use datafusion_common::{
- file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference,
+ file_options::StatementOptions, DFSchemaRef, FileType,
FileTypeWriterOptions,
+ OwnedTableReference,
};
use crate::LogicalPlan;
@@ -40,7 +41,47 @@ pub struct CopyTo {
/// to which each output partition is written to its own output file
pub single_file_output: bool,
/// Arbitrary options as tuples
- pub statement_options: StatementOptions,
+ pub copy_options: CopyOptions,
+}
+
+/// When the logical plan is constructed from SQL, CopyOptions
+/// will contain arbitrary string tuples which must be parsed into
+/// FileTypeWriterOptions. When the logical plan is constructed directly
+/// from rust code (such as via the DataFrame API), FileTypeWriterOptions
+/// can be provided directly, avoiding the run time cost and fallibility of
+/// parsing string based options.
+#[derive(Clone)]
+pub enum CopyOptions {
+ /// Holds StatementOptions parsed from a SQL statement
+ SQLOptions(StatementOptions),
+ /// Holds FileTypeWriterOptions directly provided
+ WriterOptions(Box<FileTypeWriterOptions>),
+}
+
+impl PartialEq for CopyOptions {
+ fn eq(&self, other: &CopyOptions) -> bool {
+ match self {
+ Self::SQLOptions(statement1) => match other {
+ Self::SQLOptions(statement2) => statement1.eq(statement2),
+ Self::WriterOptions(_) => false,
+ },
+ Self::WriterOptions(_) => false,
+ }
+ }
+}
+
+impl Eq for CopyOptions {}
+
+impl std::hash::Hash for CopyOptions {
+ fn hash<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
+ match self {
+ Self::SQLOptions(statement) => statement.hash(hasher),
+ Self::WriterOptions(_) => (),
+ }
+ }
}
/// The operator that modifies the content of a database (adapted from
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index b17db245e6..083ee230c7 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -17,6 +17,7 @@
//! Logical plan types
+use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder};
use crate::expr_rewriter::create_col_from_scalar_expr;
use crate::expr_vec_fmt;
@@ -1118,15 +1119,18 @@ impl LogicalPlan {
output_url,
file_format,
single_file_output,
- statement_options,
+ copy_options,
}) => {
- let op_str = statement_options
- .clone()
- .into_inner()
- .iter()
- .map(|(k, v)| format!("{k} {v}"))
- .collect::<Vec<String>>()
- .join(", ");
+ let op_str = match copy_options {
+ CopyOptions::SQLOptions(statement) => statement
+ .clone()
+ .into_inner()
+ .iter()
+ .map(|(k, v)| format!("{k} {v}"))
+ .collect::<Vec<String>>()
+ .join(", "),
+ CopyOptions::WriterOptions(_) => "".into(),
+ };
write!(f, "CopyTo: format={file_format}
output_url={output_url} single_file_output={single_file_output} options:
({op_str})")
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index fd6a65b312..be48418cb8 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -756,13 +756,13 @@ pub fn from_plan(
output_url,
file_format,
single_file_output,
- statement_options,
+ copy_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs[0].clone()),
output_url: output_url.clone(),
file_format: file_format.clone(),
single_file_output: *single_file_output,
- statement_options: statement_options.clone(),
+ copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 655442d7e3..0b0c391134 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -32,7 +32,7 @@ use datafusion_common::{
DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, Result,
SchemaReference, TableReference, ToDFSchema,
};
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr::Placeholder;
use
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
@@ -617,12 +617,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// COPY defaults to outputting a single file if not otherwise specified
let single_file_output = single_file_output.unwrap_or(true);
+ let copy_options = CopyOptions::SQLOptions(statement_options);
+
Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: statement.target,
file_format,
single_file_output,
- statement_options,
+ copy_options,
}))
}