metesynnada commented on code in PR #4661:
URL: https://github.com/apache/arrow-datafusion/pull/4661#discussion_r1051579682
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -120,27 +123,92 @@ impl FileFormat for CsvFormat {
let mut records_to_read =
self.schema_infer_max_rec.unwrap_or(usize::MAX);
- for object in objects {
- let data = store
+ 'iterating_objects: for object in objects {
+ // stream to only read as many rows as needed into memory
+ let stream = store
.get(&object.location)
- .and_then(|r| r.bytes())
- .await
- .map_err(|e| DataFusionError::External(Box::new(e)))?;
-
- let decoder =
self.file_compression_type.convert_read(data.reader())?;
- let (schema, records_read) =
arrow::csv::reader::infer_reader_schema(
- decoder,
- self.delimiter,
- Some(records_to_read),
- self.has_header,
- )?;
- schemas.push(schema.clone());
- if records_read == 0 {
- continue;
+ .await?
+ .into_stream()
+ .map_err(|e| DataFusionError::External(Box::new(e)));
+ let stream = newline_delimited_stream(stream);
+ pin_mut!(stream);
+
+ // first chunk may have header, initialize names & types vec
+ // as use header names, and types vec is used to record all
inferred types across chunks
+ let (column_names, mut column_types): (Vec<_>, Vec<_>) =
+ if let Some(data) = stream.next().await.transpose()? {
Review Comment:
The first chunk does not need to be handled outside of the stream consumer
below. I understand what is different from other chunks, but it can be handled
with using a state. Merging could reduce the code change significantly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]