alamb commented on code in PR #5056:
URL: https://github.com/apache/arrow-datafusion/pull/5056#discussion_r1091053939
##########
datafusion/core/src/avro_to_arrow/schema.rs:
##########
@@ -217,6 +217,9 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Union(_, _, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
+ DataType::RunEndEncoded(_, _) => {
+ unimplemented!("RunEndEncoded support not implemented")
Review Comment:

##########
datafusion/common/src/error.rs:
##########
@@ -388,46 +388,23 @@ impl DataFusionError {
let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
- while let Some(source) = find_source(root_error) {
+ while let Some(source) = root_error.source() {
// walk the next level
root_error = source;
// remember the lowest datafusion error so far
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
+ } else if let Some(e) =
root_error.downcast_ref::<Arc<DataFusionError>>() {
+ // As `Arc<T>::source()` calls through to `T::source()` we
need to
Review Comment:
👍
##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -21,7 +21,6 @@ use
crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::expressions::PhysicalSortExpr;
-use
crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
Review Comment:
Can the corresponding `newline_delimited_stream` module be deleted too?
https://github.com/search?q=repo%3Aapache%2Farrow-datafusion%20newline_delimited_stream&type=code
##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -224,20 +240,38 @@ impl FileOpener for CsvOpener {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
- Ok(futures::stream::iter(config.open(decoder,
true)).boxed())
+ Ok(futures::stream::iter(config.open(decoder)).boxed())
}
GetResult::Stream(s) => {
- let mut first_chunk = true;
- let s = s.map_err(Into::<DataFusionError>::into);
- let decoder = file_compression_type.convert_stream(s)?;
- Ok(newline_delimited_stream(decoder)
- .map_ok(move |bytes| {
- let reader = config.open(bytes.reader(),
first_chunk);
- first_chunk = false;
- futures::stream::iter(reader)
- })
- .try_flatten()
- .boxed())
+ let mut decoder = config.builder().build_decoder();
+ let s = s.map_err(DataFusionError::from);
+ let mut input =
file_compression_type.convert_stream(s)?.fuse();
+ let mut buffered = Bytes::new();
+
+ let s = futures::stream::poll_fn(move |cx| {
+ loop {
+ if buffered.is_empty() {
+ buffered = match
ready!(input.poll_next_unpin(cx)) {
+ Some(Ok(b)) => b,
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e.into())))
+ }
+ None => break,
+ };
+ }
+ let decoded = match
decoder.decode(buffered.as_ref()) {
+ // Note: the decoder needs to be called with
an empty
+ // array to delimt the final record
Review Comment:
```suggestion
// array to delimit the final record
```
##########
datafusion/proto/src/logical_plan/to_proto.rs:
##########
@@ -218,7 +218,7 @@ impl TryFrom<&DataType> for
protobuf::arrow_type::ArrowTypeEnum {
DataType::Decimal256(_, _) => {
return Err(Error::General("Proto serialization error: The
Decimal256 data type is not yet supported".to_owned()))
}
- DataType::Map(_, _) => {
+ DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => {
Review Comment:
I recommend either updating the error message here or adding a separate
clause for RunEndEncoded
##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -224,20 +240,38 @@ impl FileOpener for CsvOpener {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
- Ok(futures::stream::iter(config.open(decoder,
true)).boxed())
+ Ok(futures::stream::iter(config.open(decoder)).boxed())
}
GetResult::Stream(s) => {
- let mut first_chunk = true;
- let s = s.map_err(Into::<DataFusionError>::into);
- let decoder = file_compression_type.convert_stream(s)?;
- Ok(newline_delimited_stream(decoder)
- .map_ok(move |bytes| {
- let reader = config.open(bytes.reader(),
first_chunk);
- first_chunk = false;
- futures::stream::iter(reader)
- })
- .try_flatten()
- .boxed())
+ let mut decoder = config.builder().build_decoder();
+ let s = s.map_err(DataFusionError::from);
+ let mut input =
file_compression_type.convert_stream(s)?.fuse();
+ let mut buffered = Bytes::new();
+
+ let s = futures::stream::poll_fn(move |cx| {
+ loop {
+ if buffered.is_empty() {
+ buffered = match
ready!(input.poll_next_unpin(cx)) {
+ Some(Ok(b)) => b,
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e.into())))
+ }
+ None => break,
+ };
+ }
+ let decoded = match
decoder.decode(buffered.as_ref()) {
+ // Note: the decoder needs to be called with
an empty
+ // array to delimt the final record
Review Comment:
I must be missing how the code is called with an empty buffer. If all data
in buffered was consumed and then the next poll was empty, won't that break out
of the the loop prior to calling `decode()` 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]