rdettai commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r838628823
##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
)
}
+fn read_partition_no_file_columns(
+ object_store: &dyn ObjectStore,
+ partition: &[PartitionedFile],
+ batch_size: usize,
+ response_tx: Sender<ArrowResult<RecordBatch>>,
+ mut limit: Option<usize>,
+ mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+ use parquet::file::reader::FileReader;
+ for partitioned_file in partition {
+ let object_reader =
+
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+ let file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+ let mut file_rows: usize = file_reader
+ .metadata()
+ .file_metadata()
+ .num_rows()
+ .try_into()
+ .expect("Row count should always be greater than or equal to 0");
+ let remaining_rows = limit.unwrap_or(usize::MAX);
+ if file_rows >= remaining_rows {
+ file_rows = remaining_rows;
+ limit = Some(0);
+ } else if let Some(remaining_limit) = &mut limit {
+ *remaining_limit -= file_rows;
+ }
+
+ while file_rows > batch_size {
+ send_result(
+ &response_tx,
+ partition_column_projector
+ .project_empty(batch_size,
&partitioned_file.partition_values),
+ )?;
+ file_rows -= batch_size;
+ }
+ if file_rows != 0 {
+ send_result(
+ &response_tx,
+ partition_column_projector
+ .project_empty(batch_size,
&partitioned_file.partition_values),
+ )?;
+ }
+
+ if limit == Some(0) {
+ break;
+ }
+ }
+ Ok(())
+}
Review comment:
Right, error management in iterators can quickly become annoying! Then I
think the version with loop is fine for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]