alamb commented on code in PR #5056: URL: https://github.com/apache/arrow-datafusion/pull/5056#discussion_r1091053939
########## datafusion/core/src/avro_to_arrow/schema.rs: ########## @@ -217,6 +217,9 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Union(_, _, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), + DataType::RunEndEncoded(_, _) => { + unimplemented!("RunEndEncoded support not implemented") Review Comment:  ########## datafusion/common/src/error.rs: ########## @@ -388,46 +388,23 @@ impl DataFusionError { let mut last_datafusion_error = self; let mut root_error: &dyn Error = self; - while let Some(source) = find_source(root_error) { + while let Some(source) = root_error.source() { // walk the next level root_error = source; // remember the lowest datafusion error so far if let Some(e) = root_error.downcast_ref::<DataFusionError>() { last_datafusion_error = e; + } else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() { + // As `Arc<T>::source()` calls through to `T::source()` we need to Review Comment: 👍 ########## datafusion/core/src/physical_plan/file_format/csv.rs: ########## @@ -21,7 +21,6 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; Review Comment: Can the corresponding `newline_delimited_stream` module be deleted too? https://github.com/search?q=repo%3Aapache%2Farrow-datafusion%20newline_delimited_stream&type=code ########## datafusion/core/src/physical_plan/file_format/csv.rs: ########## @@ -224,20 +240,38 @@ impl FileOpener for CsvOpener { match config.object_store.get(file_meta.location()).await? { GetResult::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; - Ok(futures::stream::iter(config.open(decoder, true)).boxed()) + Ok(futures::stream::iter(config.open(decoder)).boxed()) } GetResult::Stream(s) => { - let mut first_chunk = true; - let s = s.map_err(Into::<DataFusionError>::into); - let decoder = file_compression_type.convert_stream(s)?; - Ok(newline_delimited_stream(decoder) - .map_ok(move |bytes| { - let reader = config.open(bytes.reader(), first_chunk); - first_chunk = false; - futures::stream::iter(reader) - }) - .try_flatten() - .boxed()) + let mut decoder = config.builder().build_decoder(); + let s = s.map_err(DataFusionError::from); + let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut buffered = Bytes::new(); + + let s = futures::stream::poll_fn(move |cx| { + loop { + if buffered.is_empty() { + buffered = match ready!(input.poll_next_unpin(cx)) { + Some(Ok(b)) => b, + Some(Err(e)) => { + return Poll::Ready(Some(Err(e.into()))) + } + None => break, + }; + } + let decoded = match decoder.decode(buffered.as_ref()) { + // Note: the decoder needs to be called with an empty + // array to delimt the final record Review Comment: ```suggestion // array to delimit the final record ``` ########## datafusion/proto/src/logical_plan/to_proto.rs: ########## @@ -218,7 +218,7 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { DataType::Decimal256(_, _) => { return Err(Error::General("Proto serialization error: The Decimal256 data type is not yet supported".to_owned())) } - DataType::Map(_, _) => { + DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => { Review Comment: I recommend either updating the error message here or adding a separate clause for RunEndEncoded ########## datafusion/core/src/physical_plan/file_format/csv.rs: ########## @@ -224,20 +240,38 @@ impl FileOpener for CsvOpener { match config.object_store.get(file_meta.location()).await? { GetResult::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; - Ok(futures::stream::iter(config.open(decoder, true)).boxed()) + Ok(futures::stream::iter(config.open(decoder)).boxed()) } GetResult::Stream(s) => { - let mut first_chunk = true; - let s = s.map_err(Into::<DataFusionError>::into); - let decoder = file_compression_type.convert_stream(s)?; - Ok(newline_delimited_stream(decoder) - .map_ok(move |bytes| { - let reader = config.open(bytes.reader(), first_chunk); - first_chunk = false; - futures::stream::iter(reader) - }) - .try_flatten() - .boxed()) + let mut decoder = config.builder().build_decoder(); + let s = s.map_err(DataFusionError::from); + let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut buffered = Bytes::new(); + + let s = futures::stream::poll_fn(move |cx| { + loop { + if buffered.is_empty() { + buffered = match ready!(input.poll_next_unpin(cx)) { + Some(Ok(b)) => b, + Some(Err(e)) => { + return Poll::Ready(Some(Err(e.into()))) + } + None => break, + }; + } + let decoded = match decoder.decode(buffered.as_ref()) { + // Note: the decoder needs to be called with an empty + // array to delimt the final record Review Comment: I must be missing how the code is called with an empty buffer. If all data in buffered was consumed and then the next poll was empty, won't that break out of the the loop prior to calling `decode()` 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org