tustvold commented on code in PR #4525:
URL: https://github.com/apache/arrow-datafusion/pull/4525#discussion_r1041198101
##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -138,29 +138,37 @@ where
{
let delimiter = LineDelimiter::new();
- futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async
move {
- loop {
- if let Some(next) = delimiter.next() {
- return Some((Ok(next), (s, delimiter)));
- }
+ futures::stream::unfold(
+ (s, delimiter, false),
+ |(mut s, mut delimiter, mut exhausted)| async move {
+ loop {
+ if let Some(next) = delimiter.next() {
+ return Some((Ok(next), (s, delimiter, exhausted)));
+ } else if exhausted {
+ return None;
+ }
- match s.next().await {
- Some(Ok(bytes)) => delimiter.push(bytes),
- Some(Err(e)) => return Some((Err(e), (s, delimiter))),
- None => match delimiter.finish() {
- Ok(true) => return None,
- Ok(false) => continue,
- Err(e) => return Some((Err(e), (s, delimiter))),
- },
+ match s.next().await {
+ Some(Ok(bytes)) => delimiter.push(bytes),
+ Some(Err(e)) => return Some((Err(e), (s, delimiter,
exhausted))),
+ None => {
+ exhausted = true;
Review Comment:
I see this is to handle the case of a missing null terminator, for which
there is a test added below :+1:
##########
datafusion/core/src/physical_plan/file_format/delimited_stream.rs:
##########
@@ -209,6 +217,31 @@ mod tests {
futures::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
let stream = newline_delimited_stream(input_stream);
+ let results: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(
+ results,
+ vec![
+ Bytes::from("hello\nworld\n"),
+ Bytes::from("bingo\n"),
+ Bytes::from("cupcakes")
Review Comment:
:+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]