This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fca61ef9b Enrich CSV reader config: quote & escape (#6927)
8fca61ef9b is described below

commit 8fca61ef9ba9dd46bbb2e3f6bc2ade2b3894c2df
Author: parkma99 <[email protected]>
AuthorDate: Tue Jul 25 21:11:30 2023 +0800

    Enrich CSV reader config: quote & escape (#6927)
    
    * add quote
    
    * add escape
    
    * fix
    
    * fix
    
    * fix
    
    * fix fmt
    
    * fix error
    
    * add test
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-examples/examples/csv_opener.rs         |   1 +
 datafusion/core/src/datasource/file_format/csv.rs  |  30 +++++
 .../core/src/datasource/file_format/options.rs     |  20 ++++
 .../core/src/datasource/physical_plan/csv.rs       |  82 ++++++++++++--
 .../core/src/physical_optimizer/repartition.rs     |   8 ++
 .../replace_repartition_execs.rs                   |   2 +
 datafusion/core/src/test/mod.rs                    |   4 +
 datafusion/core/tests/sql/csv_files.rs             | 125 +++++++++++++++++++++
 datafusion/core/tests/sql/mod.rs                   |   1 +
 datafusion/proto/proto/datafusion.proto            |   8 ++
 datafusion/proto/src/generated/pbjson.rs           |  76 +++++++++++++
 datafusion/proto/src/generated/prost.rs            |  26 +++++
 datafusion/proto/src/logical_plan/mod.rs           |  24 +++-
 datafusion/proto/src/physical_plan/mod.rs          |  19 +++-
 14 files changed, 413 insertions(+), 13 deletions(-)

diff --git a/datafusion-examples/examples/csv_opener.rs 
b/datafusion-examples/examples/csv_opener.rs
index f2982522a7..3b5a68d246 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -45,6 +45,7 @@ async fn main() -> Result<()> {
         Some(vec![12, 0]),
         true,
         b',',
+        b'"',
         object_store,
     );
 
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index b284079ec6..ee67003d2f 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -61,6 +61,8 @@ pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
 pub struct CsvFormat {
     has_header: bool,
     delimiter: u8,
+    quote: u8,
+    escape: Option<u8>,
     schema_infer_max_rec: Option<usize>,
     file_compression_type: FileCompressionType,
 }
@@ -71,6 +73,8 @@ impl Default for CsvFormat {
             schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
             has_header: true,
             delimiter: b',',
+            quote: b'"',
+            escape: None,
             file_compression_type: FileCompressionType::UNCOMPRESSED,
         }
     }
@@ -159,6 +163,20 @@ impl CsvFormat {
         self
     }
 
+    /// The quote character in a row.
+    /// - default to '"'
+    pub fn with_quote(mut self, quote: u8) -> Self {
+        self.quote = quote;
+        self
+    }
+
+    /// The escape character in a row.
+    /// - default is None
+    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
+        self.escape = escape;
+        self
+    }
+
     /// Set a `FileCompressionType` of CSV
     /// - defaults to `FileCompressionType::UNCOMPRESSED`
     pub fn with_file_compression_type(
@@ -173,6 +191,16 @@ impl CsvFormat {
     pub fn delimiter(&self) -> u8 {
         self.delimiter
     }
+
+    /// The quote character.
+    pub fn quote(&self) -> u8 {
+        self.quote
+    }
+
+    /// The escape character.
+    pub fn escape(&self) -> Option<u8> {
+        self.escape
+    }
 }
 
 #[async_trait]
@@ -227,6 +255,8 @@ impl FileFormat for CsvFormat {
             conf,
             self.has_header,
             self.delimiter,
+            self.quote,
+            self.escape,
             self.file_compression_type.to_owned(),
         );
         Ok(Arc::new(exec))
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index 6155dc6640..69449e9f8d 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -55,6 +55,10 @@ pub struct CsvReadOptions<'a> {
     pub has_header: bool,
     /// An optional column delimiter. Defaults to `b','`.
     pub delimiter: u8,
+    /// An optional quote character. Defaults to `b'"'`.
+    pub quote: u8,
+    /// An optional escape character. Defaults to None.
+    pub escape: Option<u8>,
     /// An optional schema representing the CSV files. If None, CSV reader 
will try to infer it
     /// based on data in file.
     pub schema: Option<&'a Schema>,
@@ -85,6 +89,8 @@ impl<'a> CsvReadOptions<'a> {
             schema: None,
             schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
             delimiter: b',',
+            quote: b'"',
+            escape: None,
             file_extension: DEFAULT_CSV_EXTENSION,
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
@@ -110,6 +116,18 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Specify quote to use for CSV read
+    pub fn quote(mut self, quote: u8) -> Self {
+        self.quote = quote;
+        self
+    }
+
+    /// Specify delimiter to use for CSV read
+    pub fn escape(mut self, escape: u8) -> Self {
+        self.escape = Some(escape);
+        self
+    }
+
     /// Specify the file extension for CSV file selection
     pub fn file_extension(mut self, file_extension: &'a str) -> Self {
         self.file_extension = file_extension;
@@ -435,6 +453,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
         let file_format = CsvFormat::default()
             .with_has_header(self.has_header)
             .with_delimiter(self.delimiter)
+            .with_quote(self.quote)
+            .with_escape(self.escape)
             .with_schema_infer_max_rec(Some(self.schema_infer_max_records))
             .with_file_compression_type(self.file_compression_type.to_owned());
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 9a7602b792..6ef92bed4a 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -59,6 +59,8 @@ pub struct CsvExec {
     projected_output_ordering: Vec<LexOrdering>,
     has_header: bool,
     delimiter: u8,
+    quote: u8,
+    escape: Option<u8>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     /// Compression type of the file associated with CsvExec
@@ -71,6 +73,8 @@ impl CsvExec {
         base_config: FileScanConfig,
         has_header: bool,
         delimiter: u8,
+        quote: u8,
+        escape: Option<u8>,
         file_compression_type: FileCompressionType,
     ) -> Self {
         let (projected_schema, projected_statistics, 
projected_output_ordering) =
@@ -83,6 +87,8 @@ impl CsvExec {
             projected_output_ordering,
             has_header,
             delimiter,
+            quote,
+            escape,
             metrics: ExecutionPlanMetricsSet::new(),
             file_compression_type,
         }
@@ -101,6 +107,16 @@ impl CsvExec {
         self.delimiter
     }
 
+    /// The quote character
+    pub fn quote(&self) -> u8 {
+        self.quote
+    }
+
+    /// The escape character
+    pub fn escape(&self) -> Option<u8> {
+        self.escape
+    }
+
     /// Redistribute files across partitions according to their size
     /// See comments on `repartition_file_groups()` for more detail.
     ///
@@ -203,6 +219,8 @@ impl ExecutionPlan for CsvExec {
             file_projection: self.base_config.file_column_projection_indices(),
             has_header: self.has_header,
             delimiter: self.delimiter,
+            quote: self.quote,
+            escape: self.escape,
             object_store,
         });
 
@@ -232,6 +250,8 @@ pub struct CsvConfig {
     file_projection: Option<Vec<usize>>,
     has_header: bool,
     delimiter: u8,
+    quote: u8,
+    escape: Option<u8>,
     object_store: Arc<dyn ObjectStore>,
 }
 
@@ -243,6 +263,7 @@ impl CsvConfig {
         file_projection: Option<Vec<usize>>,
         has_header: bool,
         delimiter: u8,
+        quote: u8,
         object_store: Arc<dyn ObjectStore>,
     ) -> Self {
         Self {
@@ -251,6 +272,8 @@ impl CsvConfig {
             file_projection,
             has_header,
             delimiter,
+            quote,
+            escape: None,
             object_store,
         }
     }
@@ -261,8 +284,11 @@ impl CsvConfig {
         let mut builder = csv::ReaderBuilder::new(self.file_schema.clone())
             .has_header(self.has_header)
             .with_delimiter(self.delimiter)
+            .with_quote(self.quote)
             .with_batch_size(self.batch_size);
-
+        if let Some(escape) = self.escape {
+            builder = builder.with_escape(escape);
+        }
         if let Some(p) = &self.file_projection {
             builder = builder.with_projection(p.clone());
         }
@@ -662,7 +688,14 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.projection = Some(vec![0, 2, 4]);
 
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
@@ -718,7 +751,14 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.projection = Some(vec![4, 0, 2]);
 
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
@@ -774,7 +814,14 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.limit = Some(5);
 
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(13, csv.projected_schema.fields().len());
         assert_eq!(13, csv.schema().fields().len());
@@ -830,7 +877,14 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
         config.limit = Some(5);
 
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
         assert_eq!(14, csv.base_config.file_schema.fields().len());
         assert_eq!(14, csv.projected_schema.fields().len());
         assert_eq!(14, csv.schema().fields().len());
@@ -884,7 +938,14 @@ mod tests {
 
         // we don't have `/date=xx/` in the path but that is ok because
         // partitions are resolved during scan anyway
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
         assert_eq!(2, csv.projected_schema.fields().len());
         assert_eq!(2, csv.schema().fields().len());
@@ -970,7 +1031,14 @@ mod tests {
         .unwrap();
 
         let config = partitioned_csv_config(file_schema, file_groups).unwrap();
-        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        let csv = CsvExec::new(
+            config,
+            true,
+            b',',
+            b'"',
+            None,
+            file_compression_type.to_owned(),
+        );
 
         let it = csv.execute(0, task_ctx).unwrap();
         let batches: Vec<_> = it.try_collect().await.unwrap();
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 866686fe1e..aa48fd77a8 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -396,6 +396,8 @@ mod tests {
             scan_config(false, true),
             false,
             b',',
+            b'"',
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -411,6 +413,8 @@ mod tests {
             scan_config(false, false),
             false,
             b',',
+            b'"',
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -426,6 +430,8 @@ mod tests {
             scan_config(true, true),
             false,
             b',',
+            b'"',
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -992,6 +998,8 @@ mod tests {
                 scan_config(false, true),
                 false,
                 b',',
+                b'"',
+                None,
                 compression_type,
             )));
 
diff --git 
a/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs 
b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
index faab482156..19f95b0e7b 100644
--- a/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
+++ b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
@@ -773,6 +773,8 @@ mod tests {
             },
             true,
             0,
+            b'"',
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 838c13f968..6e2bdfeeca 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -89,6 +89,8 @@ pub fn scan_partitioned_csv(partitions: usize) -> 
Result<Arc<CsvExec>> {
         config,
         true,
         b',',
+        b'"',
+        None,
         FileCompressionType::UNCOMPRESSED,
     )))
 }
@@ -348,6 +350,8 @@ pub fn csv_exec_sorted(
         },
         false,
         0,
+        0,
+        None,
         FileCompressionType::UNCOMPRESSED,
     ))
 }
diff --git a/datafusion/core/tests/sql/csv_files.rs 
b/datafusion/core/tests/sql/csv_files.rs
new file mode 100644
index 0000000000..5ed0068d61
--- /dev/null
+++ b/datafusion/core/tests/sql/csv_files.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::*;
+
+#[tokio::test]
+async fn csv_custom_quote() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = SessionContext::new();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Utf8, false),
+        Field::new("c2", DataType::Utf8, false),
+    ]));
+    let filename = format!("partition.{}", "csv");
+    let file_path = tmp_dir.path().join(filename);
+    let mut file = File::create(file_path)?;
+
+    // generate some data
+    for index in 0..10 {
+        let text1 = format!("id{index:}");
+        let text2 = format!("value{index:}");
+        let data = format!("~{text1}~,~{text2}~\r\n");
+        file.write_all(data.as_bytes())?;
+    }
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new()
+            .schema(&schema)
+            .has_header(false)
+            .quote(b'~'),
+    )
+    .await?;
+
+    let results = plan_and_collect(&ctx, "SELECT * from test").await?;
+
+    let expected = vec![
+        "+-----+--------+",
+        "| c1  | c2     |",
+        "+-----+--------+",
+        "| id0 | value0 |",
+        "| id1 | value1 |",
+        "| id2 | value2 |",
+        "| id3 | value3 |",
+        "| id4 | value4 |",
+        "| id5 | value5 |",
+        "| id6 | value6 |",
+        "| id7 | value7 |",
+        "| id8 | value8 |",
+        "| id9 | value9 |",
+        "+-----+--------+",
+    ];
+
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_custom_escape() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = SessionContext::new();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Utf8, false),
+        Field::new("c2", DataType::Utf8, false),
+    ]));
+    let filename = format!("partition.{}", "csv");
+    let file_path = tmp_dir.path().join(filename);
+    let mut file = File::create(file_path)?;
+
+    // generate some data
+    for index in 0..10 {
+        let text1 = format!("id{index:}");
+        let text2 = format!("value\\\"{index:}");
+        let data = format!("\"{text1}\",\"{text2}\"\r\n");
+        file.write_all(data.as_bytes())?;
+    }
+
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new()
+            .schema(&schema)
+            .has_header(false)
+            .escape(b'\\'),
+    )
+    .await?;
+
+    let results = plan_and_collect(&ctx, "SELECT * from test").await?;
+
+    let expected = vec![
+        "+-----+---------+",
+        "| c1  | c2      |",
+        "+-----+---------+",
+        "| id0 | value\"0 |",
+        "| id1 | value\"1 |",
+        "| id2 | value\"2 |",
+        "| id3 | value\"3 |",
+        "| id4 | value\"4 |",
+        "| id5 | value\"5 |",
+        "| id6 | value\"6 |",
+        "| id7 | value\"7 |",
+        "| id8 | value\"8 |",
+        "| id9 | value\"9 |",
+        "+-----+---------+",
+    ];
+
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index ca0cfc3dbe..f1ab0ccea9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -80,6 +80,7 @@ pub mod aggregates;
 pub mod arrow_files;
 #[cfg(feature = "avro")]
 pub mod create_drop;
+pub mod csv_files;
 pub mod explain_analyze;
 pub mod expr;
 pub mod group_by;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index f7247effdd..f00c184056 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -88,6 +88,10 @@ message ProjectionColumns {
 message CsvFormat {
   bool has_header = 1;
   string delimiter = 2;
+  string quote = 3;
+  oneof optional_escape {
+    string escape = 4;
+  }
 }
 
 message ParquetFormat {
@@ -1269,6 +1273,10 @@ message CsvScanExecNode {
   FileScanExecConf base_conf = 1;
   bool has_header = 2;
   string delimiter = 3;
+  string quote = 4;
+  oneof optional_escape {
+    string escape = 5;
+  }
 }
 
 message AvroScanExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index aaf6bb97bb..2bac658f04 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4191,6 +4191,12 @@ impl serde::Serialize for CsvFormat {
         if !self.delimiter.is_empty() {
             len += 1;
         }
+        if !self.quote.is_empty() {
+            len += 1;
+        }
+        if self.optional_escape.is_some() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CsvFormat", len)?;
         if self.has_header {
             struct_ser.serialize_field("hasHeader", &self.has_header)?;
@@ -4198,6 +4204,16 @@ impl serde::Serialize for CsvFormat {
         if !self.delimiter.is_empty() {
             struct_ser.serialize_field("delimiter", &self.delimiter)?;
         }
+        if !self.quote.is_empty() {
+            struct_ser.serialize_field("quote", &self.quote)?;
+        }
+        if let Some(v) = self.optional_escape.as_ref() {
+            match v {
+                csv_format::OptionalEscape::Escape(v) => {
+                    struct_ser.serialize_field("escape", v)?;
+                }
+            }
+        }
         struct_ser.end()
     }
 }
@@ -4211,12 +4227,16 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
             "has_header",
             "hasHeader",
             "delimiter",
+            "quote",
+            "escape",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             HasHeader,
             Delimiter,
+            Quote,
+            Escape,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -4240,6 +4260,8 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
                         match value {
                             "hasHeader" | "has_header" => 
Ok(GeneratedField::HasHeader),
                             "delimiter" => Ok(GeneratedField::Delimiter),
+                            "quote" => Ok(GeneratedField::Quote),
+                            "escape" => Ok(GeneratedField::Escape),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -4261,6 +4283,8 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
             {
                 let mut has_header__ = None;
                 let mut delimiter__ = None;
+                let mut quote__ = None;
+                let mut optional_escape__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
                         GeneratedField::HasHeader => {
@@ -4275,11 +4299,25 @@ impl<'de> serde::Deserialize<'de> for CsvFormat {
                             }
                             delimiter__ = Some(map.next_value()?);
                         }
+                        GeneratedField::Quote => {
+                            if quote__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("quote"));
+                            }
+                            quote__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::Escape => {
+                            if optional_escape__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("escape"));
+                            }
+                            optional_escape__ = 
map.next_value::<::std::option::Option<_>>()?.map(csv_format::OptionalEscape::Escape);
+                        }
                     }
                 }
                 Ok(CsvFormat {
                     has_header: has_header__.unwrap_or_default(),
                     delimiter: delimiter__.unwrap_or_default(),
+                    quote: quote__.unwrap_or_default(),
+                    optional_escape: optional_escape__,
                 })
             }
         }
@@ -4303,6 +4341,12 @@ impl serde::Serialize for CsvScanExecNode {
         if !self.delimiter.is_empty() {
             len += 1;
         }
+        if !self.quote.is_empty() {
+            len += 1;
+        }
+        if self.optional_escape.is_some() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CsvScanExecNode", len)?;
         if let Some(v) = self.base_conf.as_ref() {
             struct_ser.serialize_field("baseConf", v)?;
@@ -4313,6 +4357,16 @@ impl serde::Serialize for CsvScanExecNode {
         if !self.delimiter.is_empty() {
             struct_ser.serialize_field("delimiter", &self.delimiter)?;
         }
+        if !self.quote.is_empty() {
+            struct_ser.serialize_field("quote", &self.quote)?;
+        }
+        if let Some(v) = self.optional_escape.as_ref() {
+            match v {
+                csv_scan_exec_node::OptionalEscape::Escape(v) => {
+                    struct_ser.serialize_field("escape", v)?;
+                }
+            }
+        }
         struct_ser.end()
     }
 }
@@ -4328,6 +4382,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
             "has_header",
             "hasHeader",
             "delimiter",
+            "quote",
+            "escape",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -4335,6 +4391,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
             BaseConf,
             HasHeader,
             Delimiter,
+            Quote,
+            Escape,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -4359,6 +4417,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                             "baseConf" | "base_conf" => 
Ok(GeneratedField::BaseConf),
                             "hasHeader" | "has_header" => 
Ok(GeneratedField::HasHeader),
                             "delimiter" => Ok(GeneratedField::Delimiter),
+                            "quote" => Ok(GeneratedField::Quote),
+                            "escape" => Ok(GeneratedField::Escape),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -4381,6 +4441,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                 let mut base_conf__ = None;
                 let mut has_header__ = None;
                 let mut delimiter__ = None;
+                let mut quote__ = None;
+                let mut optional_escape__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
                         GeneratedField::BaseConf => {
@@ -4401,12 +4463,26 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                             }
                             delimiter__ = Some(map.next_value()?);
                         }
+                        GeneratedField::Quote => {
+                            if quote__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("quote"));
+                            }
+                            quote__ = Some(map.next_value()?);
+                        }
+                        GeneratedField::Escape => {
+                            if optional_escape__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("escape"));
+                            }
+                            optional_escape__ = 
map.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape);
+                        }
                     }
                 }
                 Ok(CsvScanExecNode {
                     base_conf: base_conf__,
                     has_header: has_header__.unwrap_or_default(),
                     delimiter: delimiter__.unwrap_or_default(),
+                    quote: quote__.unwrap_or_default(),
+                    optional_escape: optional_escape__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index e1ad6acec8..801162ab19 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -122,6 +122,19 @@ pub struct CsvFormat {
     pub has_header: bool,
     #[prost(string, tag = "2")]
     pub delimiter: ::prost::alloc::string::String,
+    #[prost(string, tag = "3")]
+    pub quote: ::prost::alloc::string::String,
+    #[prost(oneof = "csv_format::OptionalEscape", tags = "4")]
+    pub optional_escape: ::core::option::Option<csv_format::OptionalEscape>,
+}
+/// Nested message and enum types in `CsvFormat`.
+pub mod csv_format {
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum OptionalEscape {
+        #[prost(string, tag = "4")]
+        Escape(::prost::alloc::string::String),
+    }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
@@ -1784,6 +1797,19 @@ pub struct CsvScanExecNode {
     pub has_header: bool,
     #[prost(string, tag = "3")]
     pub delimiter: ::prost::alloc::string::String,
+    #[prost(string, tag = "4")]
+    pub quote: ::prost::alloc::string::String,
+    #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
+    pub optional_escape: 
::core::option::Option<csv_scan_exec_node::OptionalEscape>,
+}
+/// Nested message and enum types in `CsvScanExecNode`.
+pub mod csv_scan_exec_node {
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum OptionalEscape {
+        #[prost(string, tag = "5")]
+        Escape(::prost::alloc::string::String),
+    }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index b25f470f8d..27f9389678 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -348,11 +348,17 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Csv(protobuf::CsvFormat {
                             has_header,
                             delimiter,
-                        }) => Arc::new(
-                            CsvFormat::default()
-                                .with_has_header(*has_header)
-                                .with_delimiter(str_to_byte(delimiter)?),
-                        ),
+                            quote,
+                            optional_escape
+                        }) => {
+                            let mut csv = CsvFormat::default()
+                            .with_has_header(*has_header)
+                            .with_delimiter(str_to_byte(delimiter)?)
+                            .with_quote(str_to_byte(quote)?);
+                            if let 
Some(protobuf::csv_format::OptionalEscape::Escape(escape)) = optional_escape {
+                                csv = csv.with_quote(str_to_byte(escape)?);
+                            }
+                            Arc::new(csv)},
                         FileFormatType::Avro(..) => Arc::new(AvroFormat),
                     };
 
@@ -846,6 +852,14 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Csv(protobuf::CsvFormat {
                             delimiter: byte_to_string(csv.delimiter())?,
                             has_header: csv.has_header(),
+                            quote: byte_to_string(csv.quote())?,
+                            optional_escape: if let Some(escape) = 
csv.escape() {
+                                
Some(protobuf::csv_format::OptionalEscape::Escape(
+                                    byte_to_string(escape)?,
+                                ))
+                            } else {
+                                None
+                            },
                         })
                     } else if any.is::<AvroFormat>() {
                         FileFormatType::Avro(protobuf::AvroFormat {})
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 9e9b391a6c..cebe4eddcc 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -51,7 +51,7 @@ use datafusion_common::{DataFusionError, Result};
 use prost::bytes::BufMut;
 use prost::Message;
 
-use crate::common::proto_error;
+use crate::common::{byte_to_string, proto_error};
 use crate::common::{csv_delimiter_to_string, str_to_byte};
 use crate::physical_plan::from_proto::{
     parse_physical_expr, parse_physical_sort_expr, 
parse_protobuf_file_scan_config,
@@ -156,6 +156,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 )?,
                 scan.has_header,
                 str_to_byte(&scan.delimiter)?,
+                str_to_byte(&scan.quote)?,
+                if let 
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
+                    escape,
+                )) = &scan.optional_escape
+                {
+                    Some(str_to_byte(escape)?)
+                } else {
+                    None
+                },
                 FileCompressionType::UNCOMPRESSED,
             ))),
             PhysicalPlanType::ParquetScan(scan) => {
@@ -1071,6 +1080,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         base_conf: Some(exec.base_config().try_into()?),
                         has_header: exec.has_header(),
                         delimiter: csv_delimiter_to_string(exec.delimiter())?,
+                        quote: byte_to_string(exec.quote())?,
+                        optional_escape: if let Some(escape) = exec.escape() {
+                            
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
+                                byte_to_string(escape)?,
+                            ))
+                        } else {
+                            None
+                        },
                     },
                 )),
             })

Reply via email to