This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5912025591 Add support for reading CSV files with comments (#10467)
5912025591 is described below

commit 59120255916bcd624161ce8f5df255f2cc838406
Author: Benjamin Bannier <[email protected]>
AuthorDate: Mon Jun 10 17:31:42 2024 +0200

    Add support for reading CSV files with comments (#10467)
    
    This patch adds support for parsing CSV files containing comment lines.
    
    Closes #10262.
---
 datafusion-examples/examples/csv_opener.rs         |  1 +
 datafusion/common/src/config.rs                    |  1 +
 datafusion/core/src/datasource/file_format/csv.rs  | 13 +++++++++++-
 .../core/src/datasource/file_format/options.rs     | 10 +++++++++
 .../core/src/datasource/physical_plan/csv.rs       | 22 ++++++++++++++++++++
 .../src/physical_optimizer/enforce_distribution.rs |  3 +++
 .../src/physical_optimizer/projection_pushdown.rs  |  3 +++
 .../replace_with_order_preserving_variants.rs      |  1 +
 datafusion/core/src/test/mod.rs                    |  3 +++
 .../proto-common/proto/datafusion_common.proto     |  1 +
 datafusion/proto-common/src/from_proto/mod.rs      |  1 +
 datafusion/proto-common/src/generated/pbjson.rs    | 20 ++++++++++++++++++
 datafusion/proto-common/src/generated/prost.rs     |  3 +++
 datafusion/proto-common/src/to_proto/mod.rs        |  1 +
 datafusion/proto/proto/datafusion.proto            |  3 +++
 .../proto/src/generated/datafusion_proto_common.rs |  3 +++
 datafusion/proto/src/generated/pbjson.rs           | 21 +++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  8 ++++++++
 datafusion/proto/src/physical_plan/mod.rs          | 15 ++++++++++++++
 datafusion/sqllogictest/test_files/csv_files.slt   | 24 ++++++++++++++++++++++
 20 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/datafusion-examples/examples/csv_opener.rs 
b/datafusion-examples/examples/csv_opener.rs
index d02aa9b308..1f45026a21 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -48,6 +48,7 @@ async fn main() -> Result<()> {
         b',',
         b'"',
         object_store,
+        Some(b'#'),
     );
 
     let opener = CsvOpener::new(Arc::new(config), 
FileCompressionType::UNCOMPRESSED);
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index a4f937b6e2..1c431d04cd 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1567,6 +1567,7 @@ config_namespace! {
         pub timestamp_tz_format: Option<String>, default = None
         pub time_format: Option<String>, default = None
         pub null_value: Option<String>, default = None
+        pub comment: Option<u8>, default = None
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 60192b7f77..2139b35621 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -147,6 +147,12 @@ impl CsvFormat {
         self.options.has_header
     }
 
+    /// Lines beginning with this byte are ignored.
+    pub fn with_comment(mut self, comment: Option<u8>) -> Self {
+        self.options.comment = comment;
+        self
+    }
+
     /// The character separating values within a row.
     /// - default to ','
     pub fn with_delimiter(mut self, delimiter: u8) -> Self {
@@ -252,6 +258,7 @@ impl FileFormat for CsvFormat {
             self.options.delimiter,
             self.options.quote,
             self.options.escape,
+            self.options.comment,
             self.options.compression.into(),
         );
         Ok(Arc::new(exec))
@@ -300,7 +307,7 @@ impl CsvFormat {
         pin_mut!(stream);
 
         while let Some(chunk) = stream.next().await.transpose()? {
-            let format = arrow::csv::reader::Format::default()
+            let mut format = arrow::csv::reader::Format::default()
                 .with_header(
                     first_chunk
                         && self
@@ -310,6 +317,10 @@ impl CsvFormat {
                 )
                 .with_delimiter(self.options.delimiter);
 
+            if let Some(comment) = self.options.comment {
+                format = format.with_comment(comment);
+            }
+
             let (Schema { fields, .. }, records_read) =
                 format.infer_schema(chunk.reader(), Some(records_to_read))?;
 
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index f5bd72495d..c6d143ed67 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> {
     pub quote: u8,
     /// An optional escape character. Defaults to None.
     pub escape: Option<u8>,
+    /// If enabled, lines beginning with this byte are ignored.
+    pub comment: Option<u8>,
     /// An optional schema representing the CSV files. If None, CSV reader 
will try to infer it
     /// based on data in file.
     pub schema: Option<&'a Schema>,
@@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> {
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             file_sort_order: vec![],
+            comment: None,
         }
     }
 
@@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Specify comment char to use for CSV read
+    pub fn comment(mut self, comment: u8) -> Self {
+        self.comment = Some(comment);
+        self
+    }
+
     /// Specify delimiter to use for CSV read
     pub fn delimiter(mut self, delimiter: u8) -> Self {
         self.delimiter = delimiter;
@@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
         let file_format = CsvFormat::default()
             .with_options(table_options.csv)
             .with_has_header(self.has_header)
+            .with_comment(self.comment)
             .with_delimiter(self.delimiter)
             .with_quote(self.quote)
             .with_escape(self.escape)
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 8203e414de..c06c630c45 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -58,6 +58,7 @@ pub struct CsvExec {
     delimiter: u8,
     quote: u8,
     escape: Option<u8>,
+    comment: Option<u8>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     /// Compression type of the file associated with CsvExec
@@ -73,6 +74,7 @@ impl CsvExec {
         delimiter: u8,
         quote: u8,
         escape: Option<u8>,
+        comment: Option<u8>,
         file_compression_type: FileCompressionType,
     ) -> Self {
         let (projected_schema, projected_statistics, 
projected_output_ordering) =
@@ -92,6 +94,7 @@ impl CsvExec {
             metrics: ExecutionPlanMetricsSet::new(),
             file_compression_type,
             cache,
+            comment,
         }
     }
 
@@ -113,6 +116,11 @@ impl CsvExec {
         self.quote
     }
 
+    /// Lines beginning with this byte are ignored.
+    pub fn comment(&self) -> Option<u8> {
+        self.comment
+    }
+
     /// The escape character
     pub fn escape(&self) -> Option<u8> {
         self.escape
@@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec {
             quote: self.quote,
             escape: self.escape,
             object_store,
+            comment: self.comment,
         });
 
         let opener = CsvOpener {
@@ -265,9 +274,11 @@ pub struct CsvConfig {
     quote: u8,
     escape: Option<u8>,
     object_store: Arc<dyn ObjectStore>,
+    comment: Option<u8>,
 }
 
 impl CsvConfig {
+    #[allow(clippy::too_many_arguments)]
     /// Returns a [`CsvConfig`]
     pub fn new(
         batch_size: usize,
@@ -277,6 +288,7 @@ impl CsvConfig {
         delimiter: u8,
         quote: u8,
         object_store: Arc<dyn ObjectStore>,
+        comment: Option<u8>,
     ) -> Self {
         Self {
             batch_size,
@@ -287,6 +299,7 @@ impl CsvConfig {
             quote,
             escape: None,
             object_store,
+            comment,
         }
     }
 }
@@ -309,6 +322,9 @@ impl CsvConfig {
         if let Some(escape) = self.escape {
             builder = builder.with_escape(escape)
         }
+        if let Some(comment) = self.comment {
+            builder = builder.with_comment(comment);
+        }
 
         builder
     }
@@ -570,6 +586,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -636,6 +653,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -702,6 +720,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -765,6 +784,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
         assert_eq!(14, csv.base_config.file_schema.fields().len());
@@ -827,6 +847,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
         assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -921,6 +942,7 @@ mod tests {
             b',',
             b'"',
             None,
+            None,
             file_compression_type.to_owned(),
         );
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 88fa3a978a..f7c2aee578 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1471,6 +1471,7 @@ pub(crate) mod tests {
             b',',
             b'"',
             None,
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -1494,6 +1495,7 @@ pub(crate) mod tests {
             b',',
             b'"',
             None,
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -3767,6 +3769,7 @@ pub(crate) mod tests {
                     b',',
                     b'"',
                     None,
+                    None,
                     compression_type,
                 )),
                 vec![("a".to_string(), "a".to_string())],
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 335632ab6e..70524dfcea 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -185,6 +185,7 @@ fn try_swapping_with_csv(
             csv.delimiter(),
             csv.quote(),
             csv.escape(),
+            csv.comment(),
             csv.file_compression_type,
         )) as _
     })
@@ -1686,6 +1687,7 @@ mod tests {
             0,
             0,
             None,
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
@@ -1708,6 +1710,7 @@ mod tests {
             0,
             0,
             None,
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index e3ef3b95aa..013155b840 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -1502,6 +1502,7 @@ mod tests {
             0,
             b'"',
             None,
+            None,
             FileCompressionType::UNCOMPRESSED,
         ))
     }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index f76e1bb60b..e91f83f119 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: 
&Path) -> Result<Arc<Cs
         b',',
         b'"',
         None,
+        None,
         FileCompressionType::UNCOMPRESSED,
     )))
 }
@@ -282,6 +283,7 @@ pub fn csv_exec_sorted(
         0,
         0,
         None,
+        None,
         FileCompressionType::UNCOMPRESSED,
     ))
 }
@@ -337,6 +339,7 @@ pub fn csv_exec_ordered(
         0,
         b'"',
         None,
+        None,
         FileCompressionType::UNCOMPRESSED,
     ))
 }
diff --git a/datafusion/proto-common/proto/datafusion_common.proto 
b/datafusion/proto-common/proto/datafusion_common.proto
index d9ec7dbb51..29a348283f 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -397,6 +397,7 @@ message CsvOptions {
   string timestamp_tz_format = 10; // Optional timestamp with timezone format
   string time_format = 11; // Optional time format
   string null_value = 12; // Optional representation of null value
+  bytes comment = 13; // Optional comment character as a byte
 }
 
 // Options controlling CSV format
diff --git a/datafusion/proto-common/src/from_proto/mod.rs 
b/datafusion/proto-common/src/from_proto/mod.rs
index 3ae70318fa..25c1502ee7 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -867,6 +867,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
                 .then(|| proto_opts.time_format.clone()),
             null_value: (!proto_opts.null_value.is_empty())
                 .then(|| proto_opts.null_value.clone()),
+            comment: proto_opts.comment.first().copied(),
         })
     }
 }
diff --git a/datafusion/proto-common/src/generated/pbjson.rs 
b/datafusion/proto-common/src/generated/pbjson.rs
index 6b23724336..6f8409b82a 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1850,6 +1850,9 @@ impl serde::Serialize for CsvOptions {
         if !self.null_value.is_empty() {
             len += 1;
         }
+        if !self.comment.is_empty() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
         if !self.has_header.is_empty() {
             #[allow(clippy::needless_borrow)]
@@ -1894,6 +1897,10 @@ impl serde::Serialize for CsvOptions {
         if !self.null_value.is_empty() {
             struct_ser.serialize_field("nullValue", &self.null_value)?;
         }
+        if !self.comment.is_empty() {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("comment", 
pbjson::private::base64::encode(&self.comment).as_str())?;
+        }
         struct_ser.end()
     }
 }
@@ -1924,6 +1931,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
             "timeFormat",
             "null_value",
             "nullValue",
+            "comment",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -1940,6 +1948,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
             TimestampTzFormat,
             TimeFormat,
             NullValue,
+            Comment,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -1973,6 +1982,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
                             "timestampTzFormat" | "timestamp_tz_format" => 
Ok(GeneratedField::TimestampTzFormat),
                             "timeFormat" | "time_format" => 
Ok(GeneratedField::TimeFormat),
                             "nullValue" | "null_value" => 
Ok(GeneratedField::NullValue),
+                            "comment" => Ok(GeneratedField::Comment),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -2004,6 +2014,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
                 let mut timestamp_tz_format__ = None;
                 let mut time_format__ = None;
                 let mut null_value__ = None;
+                let mut comment__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::HasHeader => {
@@ -2088,6 +2099,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
                             }
                             null_value__ = Some(map_.next_value()?);
                         }
+                        GeneratedField::Comment => {
+                            if comment__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("comment"));
+                            }
+                            comment__ = 
+                                
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+                            ;
+                        }
                     }
                 }
                 Ok(CsvOptions {
@@ -2103,6 +2122,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
                     timestamp_tz_format: 
timestamp_tz_format__.unwrap_or_default(),
                     time_format: time_format__.unwrap_or_default(),
                     null_value: null_value__.unwrap_or_default(),
+                    comment: comment__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto-common/src/generated/prost.rs 
b/datafusion/proto-common/src/generated/prost.rs
index 48da143bc7..ff17a40738 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -608,6 +608,9 @@ pub struct CsvOptions {
     /// Optional representation of null value
     #[prost(string, tag = "12")]
     pub null_value: ::prost::alloc::string::String,
+    /// Optional comment character as a byte
+    #[prost(bytes = "vec", tag = "13")]
+    pub comment: ::prost::alloc::vec::Vec<u8>,
 }
 /// Options controlling CSV format
 #[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto-common/src/to_proto/mod.rs 
b/datafusion/proto-common/src/to_proto/mod.rs
index 28f6952aac..8e7ee9a7d6 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -894,6 +894,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
             timestamp_tz_format: 
opts.timestamp_tz_format.clone().unwrap_or_default(),
             time_format: opts.time_format.clone().unwrap_or_default(),
             null_value: opts.null_value.clone().unwrap_or_default(),
+            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
         })
     }
 }
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 9f23824b3a..df1859e25c 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -994,6 +994,9 @@ message CsvScanExecNode {
   oneof optional_escape {
     string escape = 5;
   }
+  oneof optional_comment {
+    string comment = 6;
+  }
 }
 
 message AvroScanExecNode {
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs 
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 48da143bc7..ff17a40738 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -608,6 +608,9 @@ pub struct CsvOptions {
     /// Optional representation of null value
     #[prost(string, tag = "12")]
     pub null_value: ::prost::alloc::string::String,
+    /// Optional comment character as a byte
+    #[prost(bytes = "vec", tag = "13")]
+    pub comment: ::prost::alloc::vec::Vec<u8>,
 }
 /// Options controlling CSV format
 #[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 28f80c5ee1..4d0e2d93f9 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3692,6 +3692,9 @@ impl serde::Serialize for CsvScanExecNode {
         if self.optional_escape.is_some() {
             len += 1;
         }
+        if self.optional_comment.is_some() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CsvScanExecNode", len)?;
         if let Some(v) = self.base_conf.as_ref() {
             struct_ser.serialize_field("baseConf", v)?;
@@ -3712,6 +3715,13 @@ impl serde::Serialize for CsvScanExecNode {
                 }
             }
         }
+        if let Some(v) = self.optional_comment.as_ref() {
+            match v {
+                csv_scan_exec_node::OptionalComment::Comment(v) => {
+                    struct_ser.serialize_field("comment", v)?;
+                }
+            }
+        }
         struct_ser.end()
     }
 }
@@ -3729,6 +3739,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
             "delimiter",
             "quote",
             "escape",
+            "comment",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -3738,6 +3749,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
             Delimiter,
             Quote,
             Escape,
+            Comment,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -3764,6 +3776,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                             "delimiter" => Ok(GeneratedField::Delimiter),
                             "quote" => Ok(GeneratedField::Quote),
                             "escape" => Ok(GeneratedField::Escape),
+                            "comment" => Ok(GeneratedField::Comment),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -3788,6 +3801,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                 let mut delimiter__ = None;
                 let mut quote__ = None;
                 let mut optional_escape__ = None;
+                let mut optional_comment__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::BaseConf => {
@@ -3820,6 +3834,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                             }
                             optional_escape__ = 
map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape);
                         }
+                        GeneratedField::Comment => {
+                            if optional_comment__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("comment"));
+                            }
+                            optional_comment__ = 
map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalComment::Comment);
+                        }
                     }
                 }
                 Ok(CsvScanExecNode {
@@ -3828,6 +3848,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
                     delimiter: delimiter__.unwrap_or_default(),
                     quote: quote__.unwrap_or_default(),
                     optional_escape: optional_escape__,
+                    optional_comment: optional_comment__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 9741b2bc42..ca871a4a9d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1530,6 +1530,8 @@ pub struct CsvScanExecNode {
     pub quote: ::prost::alloc::string::String,
     #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
     pub optional_escape: 
::core::option::Option<csv_scan_exec_node::OptionalEscape>,
+    #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")]
+    pub optional_comment: 
::core::option::Option<csv_scan_exec_node::OptionalComment>,
 }
 /// Nested message and enum types in `CsvScanExecNode`.
 pub mod csv_scan_exec_node {
@@ -1539,6 +1541,12 @@ pub mod csv_scan_exec_node {
         #[prost(string, tag = "5")]
         Escape(::prost::alloc::string::String),
     }
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum OptionalComment {
+        #[prost(string, tag = "6")]
+        Comment(::prost::alloc::string::String),
+    }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 550176a42e..d0011e4917 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -203,6 +203,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                 } else {
                     None
                 },
+                if let 
Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
+                    comment,
+                )) = &scan.optional_comment
+                {
+                    Some(str_to_byte(comment, "comment")?)
+                } else {
+                    None
+                },
                 FileCompressionType::UNCOMPRESSED,
             ))),
             #[cfg(feature = "parquet")]
@@ -1558,6 +1566,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                         } else {
                             None
                         },
+                        optional_comment: if let Some(comment) = 
exec.comment() {
+                            
Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
+                                byte_to_string(comment, "comment")?,
+                            ))
+                        } else {
+                            None
+                        },
                     },
                 )),
             });
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt 
b/datafusion/sqllogictest/test_files/csv_files.slt
index f581fa9abc..8902b3eebf 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -202,3 +202,27 @@ select * from stored_table_with_necessary_quoting;
 2 f|f|f
 3 g|g|g
 4 h|h|h
+
+# Read CSV file with comments
+statement ok
+COPY (VALUES
+  ('column1,column2'),
+  ('#second line is a comment'),
+  ('2,3'))
+TO 'test_files/scratch/csv_files/file_with_comments.csv'
+OPTIONS ('format.delimiter' '|');
+
+statement ok
+CREATE EXTERNAL TABLE stored_table_with_comments (
+  c1 VARCHAR,
+  c2 VARCHAR
+) STORED AS CSV
+LOCATION 'test_files/scratch/csv_files/file_with_comments.csv'
+OPTIONS ('format.comment' '#',
+         'format.delimiter' ',');
+
+query TT
+SELECT * from stored_table_with_comments;
+----
+column1 column2
+2 3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to