rdettai commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r834021207
##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
)
}
+fn read_partition_no_file_columns(
+ object_store: &dyn ObjectStore,
+ partition: &[PartitionedFile],
+ batch_size: usize,
+ response_tx: Sender<ArrowResult<RecordBatch>>,
+ mut limit: Option<usize>,
+ mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+ use parquet::file::reader::FileReader;
+ for partitioned_file in partition {
+ let object_reader =
+
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+ let file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+ let mut file_rows: usize = file_reader
+ .metadata()
+ .file_metadata()
+ .num_rows()
+ .try_into()
+ .expect("Row count should always be greater than or equal to 0");
+ let remaining_rows = limit.unwrap_or(usize::MAX);
+ if file_rows >= remaining_rows {
+ file_rows = remaining_rows;
+ limit = Some(0);
+ } else if let Some(remaining_limit) = &mut limit {
+ *remaining_limit -= file_rows;
+ }
Review comment:
Unwrapping the limit `Option` outside the loop would avoid having to
mutate limit and decrease complexity a bit more.
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -293,6 +293,36 @@ impl PartitionColumnProjector {
}
}
+ // Creates a RecordBatch with values from the partition_values. Used when
no non-partition values are read
+ fn project_empty(
Review comment:
what about `project_from_size` instead?
##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
)
}
+fn read_partition_no_file_columns(
+ object_store: &dyn ObjectStore,
+ partition: &[PartitionedFile],
+ batch_size: usize,
+ response_tx: Sender<ArrowResult<RecordBatch>>,
+ mut limit: Option<usize>,
+ mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+ use parquet::file::reader::FileReader;
+ for partitioned_file in partition {
+ let object_reader =
+
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+ let file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+ let mut file_rows: usize = file_reader
+ .metadata()
+ .file_metadata()
+ .num_rows()
+ .try_into()
+ .expect("Row count should always be greater than or equal to 0");
+ let remaining_rows = limit.unwrap_or(usize::MAX);
+ if file_rows >= remaining_rows {
+ file_rows = remaining_rows;
+ limit = Some(0);
+ } else if let Some(remaining_limit) = &mut limit {
+ *remaining_limit -= file_rows;
+ }
+
+ while file_rows > batch_size {
+ send_result(
+ &response_tx,
+ partition_column_projector
+ .project_empty(batch_size,
&partitioned_file.partition_values),
+ )?;
+ file_rows -= batch_size;
+ }
+ if file_rows != 0 {
+ send_result(
+ &response_tx,
+ partition_column_projector
+ .project_empty(batch_size,
&partitioned_file.partition_values),
+ )?;
+ }
+
+ if limit == Some(0) {
+ break;
+ }
+ }
+ Ok(())
+}
Review comment:
I still feel this could be simplified and made more readable by using
more iterators:
- iterate over file
- map them to their size
- map each size to an iterator that repeats the batch size
file_rows/batch_size times + residual
- flat map the whole thing
- apply limit with `take(limit)`
- for_each(send)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]