This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f39c040ace Add serde support for CSV FileTypeWriterOptions (#8641)
f39c040ace is described below
commit f39c040ace0b34b0775827907aa01d6bb71cbb14
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 28 11:38:16 2023 -0700
Add serde support for CSV FileTypeWriterOptions (#8641)
---
datafusion/proto/proto/datafusion.proto | 18 ++
datafusion/proto/src/generated/pbjson.rs | 213 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 29 ++-
datafusion/proto/src/logical_plan/mod.rs | 74 +++++++
datafusion/proto/src/physical_plan/from_proto.rs | 12 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 64 ++++++-
6 files changed, 406 insertions(+), 4 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index d02fc8e91b..59b82efcbb 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1207,6 +1207,7 @@ message FileTypeWriterOptions {
oneof FileType {
JsonWriterOptions json_options = 1;
ParquetWriterOptions parquet_options = 2;
+ CsvWriterOptions csv_options = 3;
}
}
@@ -1218,6 +1219,23 @@ message ParquetWriterOptions {
WriterProperties writer_properties = 1;
}
+message CsvWriterOptions {
+ // Optional column delimiter. Defaults to `b','`
+ string delimiter = 1;
+ // Whether to write column names as file headers. Defaults to `true`
+ bool has_header = 2;
+ // Optional date format for date arrays
+ string date_format = 3;
+ // Optional datetime format for datetime arrays
+ string datetime_format = 4;
+ // Optional timestamp format for timestamp arrays
+ string timestamp_format = 5;
+ // Optional time format for time arrays
+ string time_format = 6;
+ // Optional value to represent null
+ string null_value = 7;
+}
+
message WriterProperties {
uint64 data_page_size_limit = 1;
uint64 dictionary_page_size_limit = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index f860b1f1e6..956244ffdb 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -5151,6 +5151,205 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
deserializer.deserialize_struct("datafusion.CsvScanExecNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for CsvWriterOptions {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.delimiter.is_empty() {
+ len += 1;
+ }
+ if self.has_header {
+ len += 1;
+ }
+ if !self.date_format.is_empty() {
+ len += 1;
+ }
+ if !self.datetime_format.is_empty() {
+ len += 1;
+ }
+ if !self.timestamp_format.is_empty() {
+ len += 1;
+ }
+ if !self.time_format.is_empty() {
+ len += 1;
+ }
+ if !self.null_value.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.CsvWriterOptions", len)?;
+ if !self.delimiter.is_empty() {
+ struct_ser.serialize_field("delimiter", &self.delimiter)?;
+ }
+ if self.has_header {
+ struct_ser.serialize_field("hasHeader", &self.has_header)?;
+ }
+ if !self.date_format.is_empty() {
+ struct_ser.serialize_field("dateFormat", &self.date_format)?;
+ }
+ if !self.datetime_format.is_empty() {
+ struct_ser.serialize_field("datetimeFormat",
&self.datetime_format)?;
+ }
+ if !self.timestamp_format.is_empty() {
+ struct_ser.serialize_field("timestampFormat",
&self.timestamp_format)?;
+ }
+ if !self.time_format.is_empty() {
+ struct_ser.serialize_field("timeFormat", &self.time_format)?;
+ }
+ if !self.null_value.is_empty() {
+ struct_ser.serialize_field("nullValue", &self.null_value)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for CsvWriterOptions {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "delimiter",
+ "has_header",
+ "hasHeader",
+ "date_format",
+ "dateFormat",
+ "datetime_format",
+ "datetimeFormat",
+ "timestamp_format",
+ "timestampFormat",
+ "time_format",
+ "timeFormat",
+ "null_value",
+ "nullValue",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Delimiter,
+ HasHeader,
+ DateFormat,
+ DatetimeFormat,
+ TimestampFormat,
+ TimeFormat,
+ NullValue,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "delimiter" => Ok(GeneratedField::Delimiter),
+ "hasHeader" | "has_header" =>
Ok(GeneratedField::HasHeader),
+ "dateFormat" | "date_format" =>
Ok(GeneratedField::DateFormat),
+ "datetimeFormat" | "datetime_format" =>
Ok(GeneratedField::DatetimeFormat),
+ "timestampFormat" | "timestamp_format" =>
Ok(GeneratedField::TimestampFormat),
+ "timeFormat" | "time_format" =>
Ok(GeneratedField::TimeFormat),
+ "nullValue" | "null_value" =>
Ok(GeneratedField::NullValue),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = CsvWriterOptions;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.CsvWriterOptions")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<CsvWriterOptions, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut delimiter__ = None;
+ let mut has_header__ = None;
+ let mut date_format__ = None;
+ let mut datetime_format__ = None;
+ let mut timestamp_format__ = None;
+ let mut time_format__ = None;
+ let mut null_value__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Delimiter => {
+ if delimiter__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("delimiter"));
+ }
+ delimiter__ = Some(map_.next_value()?);
+ }
+ GeneratedField::HasHeader => {
+ if has_header__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("hasHeader"));
+ }
+ has_header__ = Some(map_.next_value()?);
+ }
+ GeneratedField::DateFormat => {
+ if date_format__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("dateFormat"));
+ }
+ date_format__ = Some(map_.next_value()?);
+ }
+ GeneratedField::DatetimeFormat => {
+ if datetime_format__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("datetimeFormat"));
+ }
+ datetime_format__ = Some(map_.next_value()?);
+ }
+ GeneratedField::TimestampFormat => {
+ if timestamp_format__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("timestampFormat"));
+ }
+ timestamp_format__ = Some(map_.next_value()?);
+ }
+ GeneratedField::TimeFormat => {
+ if time_format__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("timeFormat"));
+ }
+ time_format__ = Some(map_.next_value()?);
+ }
+ GeneratedField::NullValue => {
+ if null_value__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("nullValue"));
+ }
+ null_value__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(CsvWriterOptions {
+ delimiter: delimiter__.unwrap_or_default(),
+ has_header: has_header__.unwrap_or_default(),
+ date_format: date_format__.unwrap_or_default(),
+ datetime_format: datetime_format__.unwrap_or_default(),
+ timestamp_format: timestamp_format__.unwrap_or_default(),
+ time_format: time_format__.unwrap_or_default(),
+ null_value: null_value__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.CsvWriterOptions", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for CubeNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -7893,6 +8092,9 @@ impl serde::Serialize for FileTypeWriterOptions {
file_type_writer_options::FileType::ParquetOptions(v) => {
struct_ser.serialize_field("parquetOptions", v)?;
}
+ file_type_writer_options::FileType::CsvOptions(v) => {
+ struct_ser.serialize_field("csvOptions", v)?;
+ }
}
}
struct_ser.end()
@@ -7909,12 +8111,15 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
"jsonOptions",
"parquet_options",
"parquetOptions",
+ "csv_options",
+ "csvOptions",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
JsonOptions,
ParquetOptions,
+ CsvOptions,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -7938,6 +8143,7 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
match value {
"jsonOptions" | "json_options" =>
Ok(GeneratedField::JsonOptions),
"parquetOptions" | "parquet_options" =>
Ok(GeneratedField::ParquetOptions),
+ "csvOptions" | "csv_options" =>
Ok(GeneratedField::CsvOptions),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -7972,6 +8178,13 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
return
Err(serde::de::Error::duplicate_field("parquetOptions"));
}
file_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::ParquetOptions)
+;
+ }
+ GeneratedField::CsvOptions => {
+ if file_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("csvOptions"));
+ }
+ file_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::CsvOptions)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 459d5a965c..32e892e663 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1642,7 +1642,7 @@ pub struct PartitionColumn {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileTypeWriterOptions {
- #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2")]
+ #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2, 3")]
pub file_type: ::core::option::Option<file_type_writer_options::FileType>,
}
/// Nested message and enum types in `FileTypeWriterOptions`.
@@ -1654,6 +1654,8 @@ pub mod file_type_writer_options {
JsonOptions(super::JsonWriterOptions),
#[prost(message, tag = "2")]
ParquetOptions(super::ParquetWriterOptions),
+ #[prost(message, tag = "3")]
+ CsvOptions(super::CsvWriterOptions),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -1670,6 +1672,31 @@ pub struct ParquetWriterOptions {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CsvWriterOptions {
+ /// Optional column delimiter. Defaults to `b','`
+ #[prost(string, tag = "1")]
+ pub delimiter: ::prost::alloc::string::String,
+ /// Whether to write column names as file headers. Defaults to `true`
+ #[prost(bool, tag = "2")]
+ pub has_header: bool,
+ /// Optional date format for date arrays
+ #[prost(string, tag = "3")]
+ pub date_format: ::prost::alloc::string::String,
+ /// Optional datetime format for datetime arrays
+ #[prost(string, tag = "4")]
+ pub datetime_format: ::prost::alloc::string::String,
+ /// Optional timestamp format for timestamp arrays
+ #[prost(string, tag = "5")]
+ pub timestamp_format: ::prost::alloc::string::String,
+ /// Optional time format for time arrays
+ #[prost(string, tag = "6")]
+ pub time_format: ::prost::alloc::string::String,
+ /// Optional value to represent null
+ #[prost(string, tag = "7")]
+ pub null_value: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct WriterProperties {
#[prost(uint64, tag = "1")]
pub data_page_size_limit: u64,
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index d137a41fa1..e997bcde42 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::csv::WriterBuilder;
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
@@ -64,6 +65,7 @@ use datafusion_expr::{
};
use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
+use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_expr::dml::CopyOptions;
use prost::bytes::BufMut;
@@ -846,6 +848,20 @@ impl AsLogicalPlan for LogicalPlanNode {
Some(copy_to_node::CopyOptions::WriterOptions(opt)) => {
match &opt.file_type {
Some(ft) => match ft {
+ file_type_writer_options::FileType::CsvOptions(
+ writer_options,
+ ) => {
+ let writer_builder =
+
csv_writer_options_from_proto(writer_options)?;
+ CopyOptions::WriterOptions(Box::new(
+ FileTypeWriterOptions::CSV(
+ CsvWriterOptions::new(
+ writer_builder,
+
CompressionTypeVariant::UNCOMPRESSED,
+ ),
+ ),
+ ))
+ }
file_type_writer_options::FileType::ParquetOptions(
writer_options,
) => {
@@ -1630,6 +1646,40 @@ impl AsLogicalPlan for LogicalPlanNode {
}
CopyOptions::WriterOptions(opt) => {
match opt.as_ref() {
+ FileTypeWriterOptions::CSV(csv_opts) => {
+ let csv_options = &csv_opts.writer_options;
+ let csv_writer_options =
protobuf::CsvWriterOptions {
+ delimiter: (csv_options.delimiter() as
char)
+ .to_string(),
+ has_header: csv_options.header(),
+ date_format: csv_options
+ .date_format()
+ .unwrap_or("")
+ .to_owned(),
+ datetime_format: csv_options
+ .datetime_format()
+ .unwrap_or("")
+ .to_owned(),
+ timestamp_format: csv_options
+ .timestamp_format()
+ .unwrap_or("")
+ .to_owned(),
+ time_format: csv_options
+ .time_format()
+ .unwrap_or("")
+ .to_owned(),
+ null_value:
csv_options.null().to_owned(),
+ };
+ let csv_options =
+
file_type_writer_options::FileType::CsvOptions(
+ csv_writer_options,
+ );
+
Some(copy_to_node::CopyOptions::WriterOptions(
+ protobuf::FileTypeWriterOptions {
+ file_type: Some(csv_options),
+ },
+ ))
+ }
FileTypeWriterOptions::Parquet(parquet_opts)
=> {
let parquet_writer_options =
protobuf::ParquetWriterOptions {
@@ -1674,6 +1724,30 @@ impl AsLogicalPlan for LogicalPlanNode {
}
}
+pub(crate) fn csv_writer_options_from_proto(
+ writer_options: &protobuf::CsvWriterOptions,
+) -> Result<WriterBuilder> {
+ let mut builder = WriterBuilder::new();
+ if !writer_options.delimiter.is_empty() {
+ if let Some(delimiter) = writer_options.delimiter.chars().next() {
+ if delimiter.is_ascii() {
+ builder = builder.with_delimiter(delimiter as u8);
+ } else {
+ return Err(proto_error("CSV Delimiter is not ASCII"));
+ }
+ } else {
+ return Err(proto_error("Error parsing CSV Delimiter"));
+ }
+ }
+ Ok(builder
+ .with_header(writer_options.has_header)
+ .with_date_format(writer_options.date_format.clone())
+ .with_datetime_format(writer_options.datetime_format.clone())
+ .with_timestamp_format(writer_options.timestamp_format.clone())
+ .with_time_format(writer_options.time_format.clone())
+ .with_null(writer_options.null_value.clone()))
+}
+
pub(crate) fn writer_properties_to_proto(
props: &WriterProperties,
) -> protobuf::WriterProperties {
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 824eb60a57..6f1e811510 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -39,6 +39,7 @@ use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics,
WindowExpr,
};
+use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
@@ -53,7 +54,7 @@ use crate::logical_plan;
use crate::protobuf;
use crate::protobuf::physical_expr_node::ExprType;
-use crate::logical_plan::writer_properties_from_proto;
+use crate::logical_plan::{csv_writer_options_from_proto,
writer_properties_from_proto};
use chrono::{TimeZone, Utc};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -766,11 +767,18 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for
FileTypeWriterOptions {
let file_type = value
.file_type
.as_ref()
- .ok_or_else(|| proto_error("Missing required field in protobuf"))?;
+ .ok_or_else(|| proto_error("Missing required file_type field in
protobuf"))?;
match file_type {
protobuf::file_type_writer_options::FileType::JsonOptions(opts) =>
Ok(
Self::JSON(JsonWriterOptions::new(opts.compression().into())),
),
+ protobuf::file_type_writer_options::FileType::CsvOptions(opt) => {
+ let write_options = csv_writer_options_from_proto(opt)?;
+ Ok(Self::CSV(CsvWriterOptions::new(
+ write_options,
+ CompressionTypeVariant::UNCOMPRESSED,
+ )))
+ }
protobuf::file_type_writer_options::FileType::ParquetOptions(opt)
=> {
let props = opt.writer_properties.clone().unwrap_or_default();
let writer_properties = writer_properties_from_proto(&props)?;
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 3eeae01a64..2d7d85abda 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -20,6 +20,7 @@ use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use arrow::array::{ArrayRef, FixedSizeListArray};
+use arrow::csv::WriterBuilder;
use arrow::datatypes::{
DataType, Field, Fields, Int32Type, IntervalDayTimeType,
IntervalMonthDayNanoType,
IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, UnionMode,
@@ -35,8 +36,10 @@ use
datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig,
SessionContext};
use datafusion::test_util::{TestTableFactory, TestTableProvider};
+use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::file_options::StatementOptions;
+use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{internal_err, not_impl_err, plan_err,
FileTypeWriterOptions};
use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError,
ScalarValue};
use datafusion_common::{FileType, Result};
@@ -386,10 +389,69 @@ async fn roundtrip_logical_plan_copy_to_writer_options()
-> Result<()> {
}
_ => panic!(),
}
-
Ok(())
}
+#[tokio::test]
+async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let input = create_csv_scan(&ctx).await?;
+
+ let writer_properties = WriterBuilder::new()
+ .with_delimiter(b'*')
+ .with_date_format("dd/MM/yyyy".to_string())
+ .with_datetime_format("dd/MM/yyyy HH:mm:ss".to_string())
+ .with_timestamp_format("HH:mm:ss.SSSSSS".to_string())
+ .with_time_format("HH:mm:ss".to_string())
+ .with_null("NIL".to_string());
+
+ let plan = LogicalPlan::Copy(CopyTo {
+ input: Arc::new(input),
+ output_url: "test.csv".to_string(),
+ file_format: FileType::CSV,
+ single_file_output: true,
+ copy_options:
CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
+ CsvWriterOptions::new(
+ writer_properties,
+ CompressionTypeVariant::UNCOMPRESSED,
+ ),
+ ))),
+ });
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+ match logical_round_trip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.csv", copy_to.output_url);
+ assert_eq!(FileType::CSV, copy_to.file_format);
+ assert!(copy_to.single_file_output);
+ match ©_to.copy_options {
+ CopyOptions::WriterOptions(y) => match y.as_ref() {
+ FileTypeWriterOptions::CSV(p) => {
+ let props = &p.writer_options;
+ assert_eq!(b'*', props.delimiter());
+ assert_eq!("dd/MM/yyyy", props.date_format().unwrap());
+ assert_eq!(
+ "dd/MM/yyyy HH:mm:ss",
+ props.datetime_format().unwrap()
+ );
+ assert_eq!("HH:mm:ss.SSSSSS",
props.timestamp_format().unwrap());
+ assert_eq!("HH:mm:ss", props.time_format().unwrap());
+ assert_eq!("NIL", props.null());
+ }
+ _ => panic!(),
+ },
+ _ => panic!(),
+ }
+ }
+ _ => panic!(),
+ }
+
+ Ok(())
+}
async fn create_csv_scan(ctx: &SessionContext) -> Result<LogicalPlan,
DataFusionError> {
ctx.register_csv("t1", "tests/testdata/test.csv",
CsvReadOptions::default())
.await?;