This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 8c7fb0a fix: fix incorrect splitting with line delimited streaming
(#700)
8c7fb0a is described below
commit 8c7fb0a1e2fc794e375b205eab26b73d31032e2d
Author: bboissin <[email protected]>
AuthorDate: Tue Jun 2 11:53:17 2026 +0200
fix: fix incorrect splitting with line delimited streaming (#700)
* fix: fix incorrect splitting with line delimited streaming
In some cases, valid CSV in datafusion would return:
`Generic { store: "LineDelimiter", source: UnterminatedString }` due to
incorrect logic.
records_ends is a double ended iterator, so when calling next_back() the
quoting/escaping logic would run in reverse, corrupting the internal state.
* Use last instead of collecting into vec
* Clippy
---------
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
src/delimited.rs | 36 +++++++++++++++++++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
diff --git a/src/delimited.rs b/src/delimited.rs
index b9f8842..dbdac39 100644
--- a/src/delimited.rs
+++ b/src/delimited.rs
@@ -109,7 +109,7 @@ impl LineDelimiter {
}
},
};
- let end_offset = record_ends.next_back().unwrap_or(start_offset);
+ let end_offset = record_ends.last().unwrap_or(start_offset);
if start_offset != end_offset {
self.complete.push_back(val.slice(start_offset..end_offset));
}
@@ -270,4 +270,38 @@ mod tests {
]
)
}
+
+ #[tokio::test]
+ async fn test_delimiter_quotes_stream() {
+ let input = vec!["x,y,z\n,\"new\nline\",\"with ", "space\""];
+ let input_stream =
+ futures_util::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
+ let stream = newline_delimited_stream(input_stream);
+
+ let results: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(
+ results,
+ vec![
+ Bytes::from("x,y,z\n"),
+ Bytes::from(",\"new\nline\",\"with space\"")
+ ]
+ )
+ }
+
+ #[tokio::test]
+ async fn test_delimiter_escape_stream() {
+ let input = vec!["hello\n\n\"\\ttabulated\"", "world"];
+ let input_stream =
+ futures_util::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
+ let stream = newline_delimited_stream(input_stream);
+
+ let results: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(
+ results,
+ vec![
+ Bytes::from("hello\n\n"),
+ Bytes::from("\"\\ttabulated\"world")
+ ]
+ )
+ }
}