This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5912025591 Add support for reading CSV files with comments (#10467)
5912025591 is described below
commit 59120255916bcd624161ce8f5df255f2cc838406
Author: Benjamin Bannier <[email protected]>
AuthorDate: Mon Jun 10 17:31:42 2024 +0200
Add support for reading CSV files with comments (#10467)
This patch adds support for parsing CSV files containing comment lines.
Closes #10262.
---
datafusion-examples/examples/csv_opener.rs | 1 +
datafusion/common/src/config.rs | 1 +
datafusion/core/src/datasource/file_format/csv.rs | 13 +++++++++++-
.../core/src/datasource/file_format/options.rs | 10 +++++++++
.../core/src/datasource/physical_plan/csv.rs | 22 ++++++++++++++++++++
.../src/physical_optimizer/enforce_distribution.rs | 3 +++
.../src/physical_optimizer/projection_pushdown.rs | 3 +++
.../replace_with_order_preserving_variants.rs | 1 +
datafusion/core/src/test/mod.rs | 3 +++
.../proto-common/proto/datafusion_common.proto | 1 +
datafusion/proto-common/src/from_proto/mod.rs | 1 +
datafusion/proto-common/src/generated/pbjson.rs | 20 ++++++++++++++++++
datafusion/proto-common/src/generated/prost.rs | 3 +++
datafusion/proto-common/src/to_proto/mod.rs | 1 +
datafusion/proto/proto/datafusion.proto | 3 +++
.../proto/src/generated/datafusion_proto_common.rs | 3 +++
datafusion/proto/src/generated/pbjson.rs | 21 +++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 8 ++++++++
datafusion/proto/src/physical_plan/mod.rs | 15 ++++++++++++++
datafusion/sqllogictest/test_files/csv_files.slt | 24 ++++++++++++++++++++++
20 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_opener.rs
index d02aa9b308..1f45026a21 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -48,6 +48,7 @@ async fn main() -> Result<()> {
b',',
b'"',
object_store,
+ Some(b'#'),
);
let opener = CsvOpener::new(Arc::new(config),
FileCompressionType::UNCOMPRESSED);
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index a4f937b6e2..1c431d04cd 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1567,6 +1567,7 @@ config_namespace! {
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
+ pub comment: Option<u8>, default = None
}
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 60192b7f77..2139b35621 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -147,6 +147,12 @@ impl CsvFormat {
self.options.has_header
}
+ /// Lines beginning with this byte are ignored.
+ pub fn with_comment(mut self, comment: Option<u8>) -> Self {
+ self.options.comment = comment;
+ self
+ }
+
/// The character separating values within a row.
/// - default to ','
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
@@ -252,6 +258,7 @@ impl FileFormat for CsvFormat {
self.options.delimiter,
self.options.quote,
self.options.escape,
+ self.options.comment,
self.options.compression.into(),
);
Ok(Arc::new(exec))
@@ -300,7 +307,7 @@ impl CsvFormat {
pin_mut!(stream);
while let Some(chunk) = stream.next().await.transpose()? {
- let format = arrow::csv::reader::Format::default()
+ let mut format = arrow::csv::reader::Format::default()
.with_header(
first_chunk
&& self
@@ -310,6 +317,10 @@ impl CsvFormat {
)
.with_delimiter(self.options.delimiter);
+ if let Some(comment) = self.options.comment {
+ format = format.with_comment(comment);
+ }
+
let (Schema { fields, .. }, records_read) =
format.infer_schema(chunk.reader(), Some(records_to_read))?;
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index f5bd72495d..c6d143ed67 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> {
pub quote: u8,
/// An optional escape character. Defaults to None.
pub escape: Option<u8>,
+ /// If enabled, lines beginning with this byte are ignored.
+ pub comment: Option<u8>,
/// An optional schema representing the CSV files. If None, CSV reader
will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
@@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> {
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
+ comment: None,
}
}
@@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Specify comment char to use for CSV read
+ pub fn comment(mut self, comment: u8) -> Self {
+ self.comment = Some(comment);
+ self
+ }
+
/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
@@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
let file_format = CsvFormat::default()
.with_options(table_options.csv)
.with_has_header(self.has_header)
+ .with_comment(self.comment)
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 8203e414de..c06c630c45 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -58,6 +58,7 @@ pub struct CsvExec {
delimiter: u8,
quote: u8,
escape: Option<u8>,
+ comment: Option<u8>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
@@ -73,6 +74,7 @@ impl CsvExec {
delimiter: u8,
quote: u8,
escape: Option<u8>,
+ comment: Option<u8>,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics,
projected_output_ordering) =
@@ -92,6 +94,7 @@ impl CsvExec {
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
+ comment,
}
}
@@ -113,6 +116,11 @@ impl CsvExec {
self.quote
}
+ /// Lines beginning with this byte are ignored.
+ pub fn comment(&self) -> Option<u8> {
+ self.comment
+ }
+
/// The escape character
pub fn escape(&self) -> Option<u8> {
self.escape
@@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec {
quote: self.quote,
escape: self.escape,
object_store,
+ comment: self.comment,
});
let opener = CsvOpener {
@@ -265,9 +274,11 @@ pub struct CsvConfig {
quote: u8,
escape: Option<u8>,
object_store: Arc<dyn ObjectStore>,
+ comment: Option<u8>,
}
impl CsvConfig {
+ #[allow(clippy::too_many_arguments)]
/// Returns a [`CsvConfig`]
pub fn new(
batch_size: usize,
@@ -277,6 +288,7 @@ impl CsvConfig {
delimiter: u8,
quote: u8,
object_store: Arc<dyn ObjectStore>,
+ comment: Option<u8>,
) -> Self {
Self {
batch_size,
@@ -287,6 +299,7 @@ impl CsvConfig {
quote,
escape: None,
object_store,
+ comment,
}
}
}
@@ -309,6 +322,9 @@ impl CsvConfig {
if let Some(escape) = self.escape {
builder = builder.with_escape(escape)
}
+ if let Some(comment) = self.comment {
+ builder = builder.with_comment(comment);
+ }
builder
}
@@ -570,6 +586,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -636,6 +653,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -702,6 +720,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -765,6 +784,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
@@ -827,6 +847,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -921,6 +942,7 @@ mod tests {
b',',
b'"',
None,
+ None,
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 88fa3a978a..f7c2aee578 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1471,6 +1471,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -1494,6 +1495,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -3767,6 +3769,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
+ None,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 335632ab6e..70524dfcea 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -185,6 +185,7 @@ fn try_swapping_with_csv(
csv.delimiter(),
csv.quote(),
csv.escape(),
+ csv.comment(),
csv.file_compression_type,
)) as _
})
@@ -1686,6 +1687,7 @@ mod tests {
0,
0,
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -1708,6 +1710,7 @@ mod tests {
0,
0,
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index e3ef3b95aa..013155b840 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -1502,6 +1502,7 @@ mod tests {
0,
b'"',
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index f76e1bb60b..e91f83f119 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir:
&Path) -> Result<Arc<Cs
b',',
b'"',
None,
+ None,
FileCompressionType::UNCOMPRESSED,
)))
}
@@ -282,6 +283,7 @@ pub fn csv_exec_sorted(
0,
0,
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -337,6 +339,7 @@ pub fn csv_exec_ordered(
0,
b'"',
None,
+ None,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/proto-common/proto/datafusion_common.proto
b/datafusion/proto-common/proto/datafusion_common.proto
index d9ec7dbb51..29a348283f 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -397,6 +397,7 @@ message CsvOptions {
string timestamp_tz_format = 10; // Optional timestamp with timezone format
string time_format = 11; // Optional time format
string null_value = 12; // Optional representation of null value
+ bytes comment = 13; // Optional comment character as a byte
}
// Options controlling CSV format
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index 3ae70318fa..25c1502ee7 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -867,6 +867,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
.then(|| proto_opts.time_format.clone()),
null_value: (!proto_opts.null_value.is_empty())
.then(|| proto_opts.null_value.clone()),
+ comment: proto_opts.comment.first().copied(),
})
}
}
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index 6b23724336..6f8409b82a 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1850,6 +1850,9 @@ impl serde::Serialize for CsvOptions {
if !self.null_value.is_empty() {
len += 1;
}
+ if !self.comment.is_empty() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
if !self.has_header.is_empty() {
#[allow(clippy::needless_borrow)]
@@ -1894,6 +1897,10 @@ impl serde::Serialize for CsvOptions {
if !self.null_value.is_empty() {
struct_ser.serialize_field("nullValue", &self.null_value)?;
}
+ if !self.comment.is_empty() {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("comment",
pbjson::private::base64::encode(&self.comment).as_str())?;
+ }
struct_ser.end()
}
}
@@ -1924,6 +1931,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"timeFormat",
"null_value",
"nullValue",
+ "comment",
];
#[allow(clippy::enum_variant_names)]
@@ -1940,6 +1948,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
TimestampTzFormat,
TimeFormat,
NullValue,
+ Comment,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -1973,6 +1982,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"timestampTzFormat" | "timestamp_tz_format" =>
Ok(GeneratedField::TimestampTzFormat),
"timeFormat" | "time_format" =>
Ok(GeneratedField::TimeFormat),
"nullValue" | "null_value" =>
Ok(GeneratedField::NullValue),
+ "comment" => Ok(GeneratedField::Comment),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -2004,6 +2014,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
let mut timestamp_tz_format__ = None;
let mut time_format__ = None;
let mut null_value__ = None;
+ let mut comment__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::HasHeader => {
@@ -2088,6 +2099,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
}
null_value__ = Some(map_.next_value()?);
}
+ GeneratedField::Comment => {
+ if comment__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("comment"));
+ }
+ comment__ =
+
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+ ;
+ }
}
}
Ok(CsvOptions {
@@ -2103,6 +2122,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
timestamp_tz_format:
timestamp_tz_format__.unwrap_or_default(),
time_format: time_format__.unwrap_or_default(),
null_value: null_value__.unwrap_or_default(),
+ comment: comment__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto-common/src/generated/prost.rs
b/datafusion/proto-common/src/generated/prost.rs
index 48da143bc7..ff17a40738 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -608,6 +608,9 @@ pub struct CsvOptions {
/// Optional representation of null value
#[prost(string, tag = "12")]
pub null_value: ::prost::alloc::string::String,
+ /// Optional comment character as a byte
+ #[prost(bytes = "vec", tag = "13")]
+ pub comment: ::prost::alloc::vec::Vec<u8>,
}
/// Options controlling CSV format
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto-common/src/to_proto/mod.rs
b/datafusion/proto-common/src/to_proto/mod.rs
index 28f6952aac..8e7ee9a7d6 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -894,6 +894,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
timestamp_tz_format:
opts.timestamp_tz_format.clone().unwrap_or_default(),
time_format: opts.time_format.clone().unwrap_or_default(),
null_value: opts.null_value.clone().unwrap_or_default(),
+ comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
})
}
}
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 9f23824b3a..df1859e25c 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -994,6 +994,9 @@ message CsvScanExecNode {
oneof optional_escape {
string escape = 5;
}
+ oneof optional_comment {
+ string comment = 6;
+ }
}
message AvroScanExecNode {
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 48da143bc7..ff17a40738 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -608,6 +608,9 @@ pub struct CsvOptions {
/// Optional representation of null value
#[prost(string, tag = "12")]
pub null_value: ::prost::alloc::string::String,
+ /// Optional comment character as a byte
+ #[prost(bytes = "vec", tag = "13")]
+ pub comment: ::prost::alloc::vec::Vec<u8>,
}
/// Options controlling CSV format
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 28f80c5ee1..4d0e2d93f9 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3692,6 +3692,9 @@ impl serde::Serialize for CsvScanExecNode {
if self.optional_escape.is_some() {
len += 1;
}
+ if self.optional_comment.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.CsvScanExecNode", len)?;
if let Some(v) = self.base_conf.as_ref() {
struct_ser.serialize_field("baseConf", v)?;
@@ -3712,6 +3715,13 @@ impl serde::Serialize for CsvScanExecNode {
}
}
}
+ if let Some(v) = self.optional_comment.as_ref() {
+ match v {
+ csv_scan_exec_node::OptionalComment::Comment(v) => {
+ struct_ser.serialize_field("comment", v)?;
+ }
+ }
+ }
struct_ser.end()
}
}
@@ -3729,6 +3739,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"delimiter",
"quote",
"escape",
+ "comment",
];
#[allow(clippy::enum_variant_names)]
@@ -3738,6 +3749,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
Delimiter,
Quote,
Escape,
+ Comment,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -3764,6 +3776,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"delimiter" => Ok(GeneratedField::Delimiter),
"quote" => Ok(GeneratedField::Quote),
"escape" => Ok(GeneratedField::Escape),
+ "comment" => Ok(GeneratedField::Comment),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -3788,6 +3801,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
let mut delimiter__ = None;
let mut quote__ = None;
let mut optional_escape__ = None;
+ let mut optional_comment__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::BaseConf => {
@@ -3820,6 +3834,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
}
optional_escape__ =
map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape);
}
+ GeneratedField::Comment => {
+ if optional_comment__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("comment"));
+ }
+ optional_comment__ =
map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalComment::Comment);
+ }
}
}
Ok(CsvScanExecNode {
@@ -3828,6 +3848,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
delimiter: delimiter__.unwrap_or_default(),
quote: quote__.unwrap_or_default(),
optional_escape: optional_escape__,
+ optional_comment: optional_comment__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 9741b2bc42..ca871a4a9d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1530,6 +1530,8 @@ pub struct CsvScanExecNode {
pub quote: ::prost::alloc::string::String,
#[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
pub optional_escape:
::core::option::Option<csv_scan_exec_node::OptionalEscape>,
+ #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")]
+ pub optional_comment:
::core::option::Option<csv_scan_exec_node::OptionalComment>,
}
/// Nested message and enum types in `CsvScanExecNode`.
pub mod csv_scan_exec_node {
@@ -1539,6 +1541,12 @@ pub mod csv_scan_exec_node {
#[prost(string, tag = "5")]
Escape(::prost::alloc::string::String),
}
+ #[allow(clippy::derive_partial_eq_without_eq)]
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum OptionalComment {
+ #[prost(string, tag = "6")]
+ Comment(::prost::alloc::string::String),
+ }
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 550176a42e..d0011e4917 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -203,6 +203,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
} else {
None
},
+ if let
Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
+ comment,
+ )) = &scan.optional_comment
+ {
+ Some(str_to_byte(comment, "comment")?)
+ } else {
+ None
+ },
FileCompressionType::UNCOMPRESSED,
))),
#[cfg(feature = "parquet")]
@@ -1558,6 +1566,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
} else {
None
},
+ optional_comment: if let Some(comment) =
exec.comment() {
+
Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
+ byte_to_string(comment, "comment")?,
+ ))
+ } else {
+ None
+ },
},
)),
});
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt
b/datafusion/sqllogictest/test_files/csv_files.slt
index f581fa9abc..8902b3eebf 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -202,3 +202,27 @@ select * from stored_table_with_necessary_quoting;
2 f|f|f
3 g|g|g
4 h|h|h
+
+# Read CSV file with comments
+statement ok
+COPY (VALUES
+ ('column1,column2'),
+ ('#second line is a comment'),
+ ('2,3'))
+TO 'test_files/scratch/csv_files/file_with_comments.csv'
+OPTIONS ('format.delimiter' '|');
+
+statement ok
+CREATE EXTERNAL TABLE stored_table_with_comments (
+ c1 VARCHAR,
+ c2 VARCHAR
+) STORED AS CSV
+LOCATION 'test_files/scratch/csv_files/file_with_comments.csv'
+OPTIONS ('format.comment' '#',
+ 'format.delimiter' ',');
+
+query TT
+SELECT * from stored_table_with_comments;
+----
+column1 column2
+2 3
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]