pjmore commented on a change in pull request #2000:
URL: https://github.com/apache/arrow-datafusion/pull/2000#discussion_r835813180



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -435,6 +448,56 @@ fn build_row_group_predicate(
     )
 }
 
+fn read_partition_no_file_columns(
+    object_store: &dyn ObjectStore,
+    partition: &[PartitionedFile],
+    batch_size: usize,
+    response_tx: Sender<ArrowResult<RecordBatch>>,
+    mut limit: Option<usize>,
+    mut partition_column_projector: PartitionColumnProjector,
+) -> Result<()> {
+    use parquet::file::reader::FileReader;
+    for partitioned_file in partition {
+        let object_reader =
+            
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
+        let file_reader = 
SerializedFileReader::new(ChunkObjectReader(object_reader))?;
+        let mut file_rows: usize = file_reader
+            .metadata()
+            .file_metadata()
+            .num_rows()
+            .try_into()
+            .expect("Row count should always be greater than or equal to 0");
+        let remaining_rows = limit.unwrap_or(usize::MAX);
+        if file_rows >= remaining_rows {
+            file_rows = remaining_rows;
+            limit = Some(0);
+        } else if let Some(remaining_limit) = &mut limit {
+            *remaining_limit -= file_rows;
+        }
+
+        while file_rows > batch_size {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, 
&partitioned_file.partition_values),
+            )?;
+            file_rows -= batch_size;
+        }
+        if file_rows != 0 {
+            send_result(
+                &response_tx,
+                partition_column_projector
+                    .project_empty(batch_size, 
&partitioned_file.partition_values),
+            )?;
+        }
+
+        if limit == Some(0) {
+            break;
+        }
+    }
+    Ok(())
+}

Review comment:
        I couldn't find a good way to implement what you suggested. The error 
handling when opening the file was the main issue that I ran into. I couldn't 
figure out another way to short circuit when the limit was met and short 
circuit on any errors that occured. If you're okay scanning all of the 
partition files even on an error I'm okay with it, I just figured that for 
remote object stores that that might be a bad idea. 
   
   ```
       let mut res: Result<()> = Ok(());
       let mut batch_size_partition_iter = partition.iter() 
           .map(|partitioned_file|{
               let mut num_rows: usize = match 
object_store.file_reader(partitioned_file.file_meta.sized_file.clone()){
                   Ok(object_reader) => {
                       match 
SerializedFileReader::new(ChunkObjectReader(object_reader)){
                           Ok(file_reader) => {
                               file_reader
                                   .metadata()
                                   .file_metadata()
                                   .num_rows()
                                   .try_into()
                                   .expect("Row count should always be greater 
than or equal to 0 and less than usize::MAX")
                           },
                           Err(e) =>{
                               res = Err(e.into());
                               0
                           },
                       }
                   },
                   Err(e) => {
                       res = Err(e);
                       0
                   },
               };            
               num_rows = limit.min(num_rows); 
               limit -= num_rows;
               (num_rows, partitioned_file.partition_values.as_slice())
           })
           .take_while(|(num_rows, _)| *num_rows != 0)
           .flat_map(|(num_rows, partition_values)| 
BatchSizeIter::new(num_rows, 
batch_size).zip(std::iter::repeat(partition_values)));
           Iterator::try_for_each(&mut batch_size_partition_iter,|(batch_size, 
partition_values)| {
               send_result(&response_tx, 
partition_column_projector.project_empty(batch_size, partition_values))
           })?;
           res?;
           Ok(())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to