This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8fca61ef9b Enrich CSV reader config: quote & escape (#6927)
8fca61ef9b is described below
commit 8fca61ef9ba9dd46bbb2e3f6bc2ade2b3894c2df
Author: parkma99 <[email protected]>
AuthorDate: Tue Jul 25 21:11:30 2023 +0800
Enrich CSV reader config: quote & escape (#6927)
* add quote
* add escape
* fix
* fix
* fix
* fix fmt
* fix error
* add test
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-examples/examples/csv_opener.rs | 1 +
datafusion/core/src/datasource/file_format/csv.rs | 30 +++++
.../core/src/datasource/file_format/options.rs | 20 ++++
.../core/src/datasource/physical_plan/csv.rs | 82 ++++++++++++--
.../core/src/physical_optimizer/repartition.rs | 8 ++
.../replace_repartition_execs.rs | 2 +
datafusion/core/src/test/mod.rs | 4 +
datafusion/core/tests/sql/csv_files.rs | 125 +++++++++++++++++++++
datafusion/core/tests/sql/mod.rs | 1 +
datafusion/proto/proto/datafusion.proto | 8 ++
datafusion/proto/src/generated/pbjson.rs | 76 +++++++++++++
datafusion/proto/src/generated/prost.rs | 26 +++++
datafusion/proto/src/logical_plan/mod.rs | 24 +++-
datafusion/proto/src/physical_plan/mod.rs | 19 +++-
14 files changed, 413 insertions(+), 13 deletions(-)
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_opener.rs
index f2982522a7..3b5a68d246 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -45,6 +45,7 @@ async fn main() -> Result<()> {
Some(vec![12, 0]),
true,
b',',
+ b'"',
object_store,
);
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index b284079ec6..ee67003d2f 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -61,6 +61,8 @@ pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
pub struct CsvFormat {
has_header: bool,
delimiter: u8,
+ quote: u8,
+ escape: Option<u8>,
schema_infer_max_rec: Option<usize>,
file_compression_type: FileCompressionType,
}
@@ -71,6 +73,8 @@ impl Default for CsvFormat {
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
has_header: true,
delimiter: b',',
+ quote: b'"',
+ escape: None,
file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
@@ -159,6 +163,20 @@ impl CsvFormat {
self
}
+ /// The quote character in a row.
+ /// - default to '"'
+ pub fn with_quote(mut self, quote: u8) -> Self {
+ self.quote = quote;
+ self
+ }
+
+ /// The escape character in a row.
+ /// - default is None
+ pub fn with_escape(mut self, escape: Option<u8>) -> Self {
+ self.escape = escape;
+ self
+ }
+
/// Set a `FileCompressionType` of CSV
/// - defaults to `FileCompressionType::UNCOMPRESSED`
pub fn with_file_compression_type(
@@ -173,6 +191,16 @@ impl CsvFormat {
pub fn delimiter(&self) -> u8 {
self.delimiter
}
+
+ /// The quote character.
+ pub fn quote(&self) -> u8 {
+ self.quote
+ }
+
+ /// The escape character.
+ pub fn escape(&self) -> Option<u8> {
+ self.escape
+ }
}
#[async_trait]
@@ -227,6 +255,8 @@ impl FileFormat for CsvFormat {
conf,
self.has_header,
self.delimiter,
+ self.quote,
+ self.escape,
self.file_compression_type.to_owned(),
);
Ok(Arc::new(exec))
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 6155dc6640..69449e9f8d 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -55,6 +55,10 @@ pub struct CsvReadOptions<'a> {
pub has_header: bool,
/// An optional column delimiter. Defaults to `b','`.
pub delimiter: u8,
+ /// An optional quote character. Defaults to `b'"'`.
+ pub quote: u8,
+ /// An optional escape character. Defaults to None.
+ pub escape: Option<u8>,
/// An optional schema representing the CSV files. If None, CSV reader
will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
@@ -85,6 +89,8 @@ impl<'a> CsvReadOptions<'a> {
schema: None,
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
+ quote: b'"',
+ escape: None,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
@@ -110,6 +116,18 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Specify quote to use for CSV read
+ pub fn quote(mut self, quote: u8) -> Self {
+ self.quote = quote;
+ self
+ }
+
+ /// Specify delimiter to use for CSV read
+ pub fn escape(mut self, escape: u8) -> Self {
+ self.escape = Some(escape);
+ self
+ }
+
/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
@@ -435,6 +453,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
let file_format = CsvFormat::default()
.with_has_header(self.has_header)
.with_delimiter(self.delimiter)
+ .with_quote(self.quote)
+ .with_escape(self.escape)
.with_schema_infer_max_rec(Some(self.schema_infer_max_records))
.with_file_compression_type(self.file_compression_type.to_owned());
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 9a7602b792..6ef92bed4a 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -59,6 +59,8 @@ pub struct CsvExec {
projected_output_ordering: Vec<LexOrdering>,
has_header: bool,
delimiter: u8,
+ quote: u8,
+ escape: Option<u8>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
@@ -71,6 +73,8 @@ impl CsvExec {
base_config: FileScanConfig,
has_header: bool,
delimiter: u8,
+ quote: u8,
+ escape: Option<u8>,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics,
projected_output_ordering) =
@@ -83,6 +87,8 @@ impl CsvExec {
projected_output_ordering,
has_header,
delimiter,
+ quote,
+ escape,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
}
@@ -101,6 +107,16 @@ impl CsvExec {
self.delimiter
}
+ /// The quote character
+ pub fn quote(&self) -> u8 {
+ self.quote
+ }
+
+ /// The escape character
+ pub fn escape(&self) -> Option<u8> {
+ self.escape
+ }
+
/// Redistribute files across partitions according to their size
/// See comments on `repartition_file_groups()` for more detail.
///
@@ -203,6 +219,8 @@ impl ExecutionPlan for CsvExec {
file_projection: self.base_config.file_column_projection_indices(),
has_header: self.has_header,
delimiter: self.delimiter,
+ quote: self.quote,
+ escape: self.escape,
object_store,
});
@@ -232,6 +250,8 @@ pub struct CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
+ quote: u8,
+ escape: Option<u8>,
object_store: Arc<dyn ObjectStore>,
}
@@ -243,6 +263,7 @@ impl CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
+ quote: u8,
object_store: Arc<dyn ObjectStore>,
) -> Self {
Self {
@@ -251,6 +272,8 @@ impl CsvConfig {
file_projection,
has_header,
delimiter,
+ quote,
+ escape: None,
object_store,
}
}
@@ -261,8 +284,11 @@ impl CsvConfig {
let mut builder = csv::ReaderBuilder::new(self.file_schema.clone())
.has_header(self.has_header)
.with_delimiter(self.delimiter)
+ .with_quote(self.quote)
.with_batch_size(self.batch_size);
-
+ if let Some(escape) = self.escape {
+ builder = builder.with_escape(escape);
+ }
if let Some(p) = &self.file_projection {
builder = builder.with_projection(p.clone());
}
@@ -662,7 +688,14 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.projection = Some(vec![0, 2, 4]);
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
@@ -718,7 +751,14 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.projection = Some(vec![4, 0, 2]);
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
@@ -774,7 +814,14 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.limit = Some(5);
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
@@ -830,7 +877,14 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
config.limit = Some(5);
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());
@@ -884,7 +938,14 @@ mod tests {
// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(2, csv.projected_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());
@@ -970,7 +1031,14 @@ mod tests {
.unwrap();
let config = partitioned_csv_config(file_schema, file_groups).unwrap();
- let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ let csv = CsvExec::new(
+ config,
+ true,
+ b',',
+ b'"',
+ None,
+ file_compression_type.to_owned(),
+ );
let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 866686fe1e..aa48fd77a8 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -396,6 +396,8 @@ mod tests {
scan_config(false, true),
false,
b',',
+ b'"',
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -411,6 +413,8 @@ mod tests {
scan_config(false, false),
false,
b',',
+ b'"',
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -426,6 +430,8 @@ mod tests {
scan_config(true, true),
false,
b',',
+ b'"',
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -992,6 +998,8 @@ mod tests {
scan_config(false, true),
false,
b',',
+ b'"',
+ None,
compression_type,
)));
diff --git
a/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
index faab482156..19f95b0e7b 100644
--- a/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
+++ b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
@@ -773,6 +773,8 @@ mod tests {
},
true,
0,
+ b'"',
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 838c13f968..6e2bdfeeca 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -89,6 +89,8 @@ pub fn scan_partitioned_csv(partitions: usize) ->
Result<Arc<CsvExec>> {
config,
true,
b',',
+ b'"',
+ None,
FileCompressionType::UNCOMPRESSED,
)))
}
@@ -348,6 +350,8 @@ pub fn csv_exec_sorted(
},
false,
0,
+ 0,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/core/tests/sql/csv_files.rs
b/datafusion/core/tests/sql/csv_files.rs
new file mode 100644
index 0000000000..5ed0068d61
--- /dev/null
+++ b/datafusion/core/tests/sql/csv_files.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::*;
+
+#[tokio::test]
+async fn csv_custom_quote() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = SessionContext::new();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Utf8, false),
+ ]));
+ let filename = format!("partition.{}", "csv");
+ let file_path = tmp_dir.path().join(filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for index in 0..10 {
+ let text1 = format!("id{index:}");
+ let text2 = format!("value{index:}");
+ let data = format!("~{text1}~,~{text2}~\r\n");
+ file.write_all(data.as_bytes())?;
+ }
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new()
+ .schema(&schema)
+ .has_header(false)
+ .quote(b'~'),
+ )
+ .await?;
+
+ let results = plan_and_collect(&ctx, "SELECT * from test").await?;
+
+ let expected = vec![
+ "+-----+--------+",
+ "| c1 | c2 |",
+ "+-----+--------+",
+ "| id0 | value0 |",
+ "| id1 | value1 |",
+ "| id2 | value2 |",
+ "| id3 | value3 |",
+ "| id4 | value4 |",
+ "| id5 | value5 |",
+ "| id6 | value6 |",
+ "| id7 | value7 |",
+ "| id8 | value8 |",
+ "| id9 | value9 |",
+ "+-----+--------+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_custom_escape() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = SessionContext::new();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Utf8, false),
+ ]));
+ let filename = format!("partition.{}", "csv");
+ let file_path = tmp_dir.path().join(filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for index in 0..10 {
+ let text1 = format!("id{index:}");
+ let text2 = format!("value\\\"{index:}");
+ let data = format!("\"{text1}\",\"{text2}\"\r\n");
+ file.write_all(data.as_bytes())?;
+ }
+
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new()
+ .schema(&schema)
+ .has_header(false)
+ .escape(b'\\'),
+ )
+ .await?;
+
+ let results = plan_and_collect(&ctx, "SELECT * from test").await?;
+
+ let expected = vec![
+ "+-----+---------+",
+ "| c1 | c2 |",
+ "+-----+---------+",
+ "| id0 | value\"0 |",
+ "| id1 | value\"1 |",
+ "| id2 | value\"2 |",
+ "| id3 | value\"3 |",
+ "| id4 | value\"4 |",
+ "| id5 | value\"5 |",
+ "| id6 | value\"6 |",
+ "| id7 | value\"7 |",
+ "| id8 | value\"8 |",
+ "| id9 | value\"9 |",
+ "+-----+---------+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index ca0cfc3dbe..f1ab0ccea9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -80,6 +80,7 @@ pub mod aggregates;
pub mod arrow_files;
#[cfg(feature = "avro")]
pub mod create_drop;
+pub mod csv_files;
pub mod explain_analyze;
pub mod expr;
pub mod group_by;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index f7247effdd..f00c184056 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -88,6 +88,10 @@ message ProjectionColumns {
message CsvFormat {
bool has_header = 1;
string delimiter = 2;
+ string quote = 3;
+ oneof optional_escape {
+ string escape = 4;
+ }
}
message ParquetFormat {
@@ -1269,6 +1273,10 @@ message CsvScanExecNode {
FileScanExecConf base_conf = 1;
bool has_header = 2;
string delimiter = 3;
+ string quote = 4;
+ oneof optional_escape {
+ string escape = 5;
+ }
}
message AvroScanExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index aaf6bb97bb..2bac658f04 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4191,6 +4191,12 @@ impl serde::Serialize for CsvFormat {
if !self.delimiter.is_empty() {
len += 1;
}
+ if !self.quote.is_empty() {
+ len += 1;
+ }
+ if self.optional_escape.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.CsvFormat", len)?;
if self.has_header {
struct_ser.serialize_field("hasHeader", &self.has_header)?;
@@ -4198,6 +4204,16 @@ impl serde::Serialize for CsvFormat {
if !self.delimiter.is_empty() {
struct_ser.serialize_field("delimiter", &self.delimiter)?;
}
+ if !self.quote.is_empty() {
+ struct_ser.serialize_field("quote", &self.quote)?;
+ }
+ if let Some(v) = self.optional_escape.as_ref() {
+ match v {
+ csv_format::OptionalEscape::Escape(v) => {
+ struct_ser.serialize_field("escape", v)?;
+ }
+ }
+ }
struct_ser.end()
}
}
@@ -4211,12 +4227,16 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
"has_header",
"hasHeader",
"delimiter",
+ "quote",
+ "escape",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
HasHeader,
Delimiter,
+ Quote,
+ Escape,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -4240,6 +4260,8 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
match value {
"hasHeader" | "has_header" =>
Ok(GeneratedField::HasHeader),
"delimiter" => Ok(GeneratedField::Delimiter),
+ "quote" => Ok(GeneratedField::Quote),
+ "escape" => Ok(GeneratedField::Escape),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -4261,6 +4283,8 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
{
let mut has_header__ = None;
let mut delimiter__ = None;
+ let mut quote__ = None;
+ let mut optional_escape__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::HasHeader => {
@@ -4275,11 +4299,25 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
}
delimiter__ = Some(map.next_value()?);
}
+ GeneratedField::Quote => {
+ if quote__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("quote"));
+ }
+ quote__ = Some(map.next_value()?);
+ }
+ GeneratedField::Escape => {
+ if optional_escape__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("escape"));
+ }
+ optional_escape__ =
map.next_value::<::std::option::Option<_>>()?.map(csv_format::OptionalEscape::Escape);
+ }
}
}
Ok(CsvFormat {
has_header: has_header__.unwrap_or_default(),
delimiter: delimiter__.unwrap_or_default(),
+ quote: quote__.unwrap_or_default(),
+ optional_escape: optional_escape__,
})
}
}
@@ -4303,6 +4341,12 @@ impl serde::Serialize for CsvScanExecNode {
if !self.delimiter.is_empty() {
len += 1;
}
+ if !self.quote.is_empty() {
+ len += 1;
+ }
+ if self.optional_escape.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.CsvScanExecNode", len)?;
if let Some(v) = self.base_conf.as_ref() {
struct_ser.serialize_field("baseConf", v)?;
@@ -4313,6 +4357,16 @@ impl serde::Serialize for CsvScanExecNode {
if !self.delimiter.is_empty() {
struct_ser.serialize_field("delimiter", &self.delimiter)?;
}
+ if !self.quote.is_empty() {
+ struct_ser.serialize_field("quote", &self.quote)?;
+ }
+ if let Some(v) = self.optional_escape.as_ref() {
+ match v {
+ csv_scan_exec_node::OptionalEscape::Escape(v) => {
+ struct_ser.serialize_field("escape", v)?;
+ }
+ }
+ }
struct_ser.end()
}
}
@@ -4328,6 +4382,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"has_header",
"hasHeader",
"delimiter",
+ "quote",
+ "escape",
];
#[allow(clippy::enum_variant_names)]
@@ -4335,6 +4391,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
BaseConf,
HasHeader,
Delimiter,
+ Quote,
+ Escape,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -4359,6 +4417,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"baseConf" | "base_conf" =>
Ok(GeneratedField::BaseConf),
"hasHeader" | "has_header" =>
Ok(GeneratedField::HasHeader),
"delimiter" => Ok(GeneratedField::Delimiter),
+ "quote" => Ok(GeneratedField::Quote),
+ "escape" => Ok(GeneratedField::Escape),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -4381,6 +4441,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
let mut base_conf__ = None;
let mut has_header__ = None;
let mut delimiter__ = None;
+ let mut quote__ = None;
+ let mut optional_escape__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::BaseConf => {
@@ -4401,12 +4463,26 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
}
delimiter__ = Some(map.next_value()?);
}
+ GeneratedField::Quote => {
+ if quote__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("quote"));
+ }
+ quote__ = Some(map.next_value()?);
+ }
+ GeneratedField::Escape => {
+ if optional_escape__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("escape"));
+ }
+ optional_escape__ =
map.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape);
+ }
}
}
Ok(CsvScanExecNode {
base_conf: base_conf__,
has_header: has_header__.unwrap_or_default(),
delimiter: delimiter__.unwrap_or_default(),
+ quote: quote__.unwrap_or_default(),
+ optional_escape: optional_escape__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index e1ad6acec8..801162ab19 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -122,6 +122,19 @@ pub struct CsvFormat {
pub has_header: bool,
#[prost(string, tag = "2")]
pub delimiter: ::prost::alloc::string::String,
+ #[prost(string, tag = "3")]
+ pub quote: ::prost::alloc::string::String,
+ #[prost(oneof = "csv_format::OptionalEscape", tags = "4")]
+ pub optional_escape: ::core::option::Option<csv_format::OptionalEscape>,
+}
+/// Nested message and enum types in `CsvFormat`.
+pub mod csv_format {
+ #[allow(clippy::derive_partial_eq_without_eq)]
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum OptionalEscape {
+ #[prost(string, tag = "4")]
+ Escape(::prost::alloc::string::String),
+ }
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1784,6 +1797,19 @@ pub struct CsvScanExecNode {
pub has_header: bool,
#[prost(string, tag = "3")]
pub delimiter: ::prost::alloc::string::String,
+ #[prost(string, tag = "4")]
+ pub quote: ::prost::alloc::string::String,
+ #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
+ pub optional_escape:
::core::option::Option<csv_scan_exec_node::OptionalEscape>,
+}
+/// Nested message and enum types in `CsvScanExecNode`.
+pub mod csv_scan_exec_node {
+ #[allow(clippy::derive_partial_eq_without_eq)]
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum OptionalEscape {
+ #[prost(string, tag = "5")]
+ Escape(::prost::alloc::string::String),
+ }
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index b25f470f8d..27f9389678 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -348,11 +348,17 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Csv(protobuf::CsvFormat {
has_header,
delimiter,
- }) => Arc::new(
- CsvFormat::default()
- .with_has_header(*has_header)
- .with_delimiter(str_to_byte(delimiter)?),
- ),
+ quote,
+ optional_escape
+ }) => {
+ let mut csv = CsvFormat::default()
+ .with_has_header(*has_header)
+ .with_delimiter(str_to_byte(delimiter)?)
+ .with_quote(str_to_byte(quote)?);
+ if let
Some(protobuf::csv_format::OptionalEscape::Escape(escape)) = optional_escape {
+ csv = csv.with_quote(str_to_byte(escape)?);
+ }
+ Arc::new(csv)},
FileFormatType::Avro(..) => Arc::new(AvroFormat),
};
@@ -846,6 +852,14 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Csv(protobuf::CsvFormat {
delimiter: byte_to_string(csv.delimiter())?,
has_header: csv.has_header(),
+ quote: byte_to_string(csv.quote())?,
+ optional_escape: if let Some(escape) =
csv.escape() {
+
Some(protobuf::csv_format::OptionalEscape::Escape(
+ byte_to_string(escape)?,
+ ))
+ } else {
+ None
+ },
})
} else if any.is::<AvroFormat>() {
FileFormatType::Avro(protobuf::AvroFormat {})
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 9e9b391a6c..cebe4eddcc 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -51,7 +51,7 @@ use datafusion_common::{DataFusionError, Result};
use prost::bytes::BufMut;
use prost::Message;
-use crate::common::proto_error;
+use crate::common::{byte_to_string, proto_error};
use crate::common::{csv_delimiter_to_string, str_to_byte};
use crate::physical_plan::from_proto::{
parse_physical_expr, parse_physical_sort_expr,
parse_protobuf_file_scan_config,
@@ -156,6 +156,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
)?,
scan.has_header,
str_to_byte(&scan.delimiter)?,
+ str_to_byte(&scan.quote)?,
+ if let
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
+ escape,
+ )) = &scan.optional_escape
+ {
+ Some(str_to_byte(escape)?)
+ } else {
+ None
+ },
FileCompressionType::UNCOMPRESSED,
))),
PhysicalPlanType::ParquetScan(scan) => {
@@ -1071,6 +1080,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
base_conf: Some(exec.base_config().try_into()?),
has_header: exec.has_header(),
delimiter: csv_delimiter_to_string(exec.delimiter())?,
+ quote: byte_to_string(exec.quote())?,
+ optional_escape: if let Some(escape) = exec.escape() {
+
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
+ byte_to_string(escape)?,
+ ))
+ } else {
+ None
+ },
},
)),
})