tustvold commented on code in PR #3051:
URL: https://github.com/apache/arrow-datafusion/pull/3051#discussion_r939916343


##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -50,9 +53,8 @@ pub trait FileOpener: Unpin {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
-        file: ObjectMeta,
-        range: Option<FileRange>,
-    ) -> FileOpenFuture;
+        file_meta: FileMeta,

Review Comment:
   Why a new struct and not just `PartitionedFile`



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -390,6 +433,63 @@ impl AsyncFileReader for ParquetFileReader {
     }
 }
 
+impl Display for DefaultParquetFileReaderFactory {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "DefaultParquetFileReaderFactory")
+    }
+}
+
+impl Debug for DefaultParquetFileReaderFactory {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "DefaultParquetFileReaderFactory")
+    }
+}
+
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: ParquetFileMetrics,
+    ) -> Result<Box<dyn AsyncFileReader + Send>> {
+        Ok(Box::new(ParquetFileReader {
+            meta: file_meta.object_meta,
+            store: Arc::clone(&self.store),
+            metadata_size_hint,
+            metrics,
+        }))
+    }
+}
+
+struct ThinFileReader {

Review Comment:
   We should probably fix the need for this upstream. In the meantime perhaps 
we could called this `BoxedAsyncFileReader` to make clear what it is for



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -390,6 +433,63 @@ impl AsyncFileReader for ParquetFileReader {
     }
 }
 
+impl Display for DefaultParquetFileReaderFactory {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "DefaultParquetFileReaderFactory")
+    }
+}
+
+impl Debug for DefaultParquetFileReaderFactory {

Review Comment:
   `#[derive(Debug)]`?



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -210,27 +216,39 @@ impl ExecutionPlan for ParquetExec {
     fn execute(
         &self,
         partition_index: usize,
-        context: Arc<TaskContext>,
+        ctx: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let projection = match 
self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
 
+        let parquet_file_reader_factory =
+            if let Some(factory) = self.parquet_file_reader_factory.as_ref() {

Review Comment:
   Could use unwrap_or_else here



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -81,11 +83,13 @@ pub struct ParquetExec {
     pruning_predicate: Option<PruningPredicate>,
     /// Optional hint for the size of the parquet metadata
     metadata_size_hint: Option<usize>,
+    /// Optional user defined parquet file reader factory
+    parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
 }
 
 /// Stores metrics about the parquet execution for a particular parquet file
 #[derive(Debug, Clone)]
-struct ParquetFileMetrics {
+pub struct ParquetFileMetrics {

Review Comment:
   It would be good if we could find a way to not make this public, as we're 
likely to want to alter it over time



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -342,7 +364,28 @@ impl FileOpener for ParquetOpener {
                 });
 
             Ok(adapted.boxed())
-        })
+        }))
+    }
+}
+
+pub trait ParquetFileReaderFactory:
+    std::fmt::Display + Debug + Send + Sync + 'static

Review Comment:
   Why does this need Display?



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -100,6 +104,7 @@ impl ParquetExec {
         base_config: FileScanConfig,
         predicate: Option<Expr>,
         metadata_size_hint: Option<usize>,
+        parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,

Review Comment:
   I think a doc comment explaining what this is would be good.
   
   Also this is technically a breaking change



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -342,7 +364,28 @@ impl FileOpener for ParquetOpener {
                 });
 
             Ok(adapted.boxed())
-        })
+        }))
+    }
+}
+
+pub trait ParquetFileReaderFactory:
+    std::fmt::Display + Debug + Send + Sync + 'static
+{
+    fn create_reader(
+        &self,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: ParquetFileMetrics,

Review Comment:
   I think we could just remove this from here



##########
datafusion/core/src/datasource/listing/mod.rs:
##########
@@ -58,6 +59,8 @@ pub struct PartitionedFile {
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
+    /// An optional field for user defined per object metadata  
+    pub metadata_ext: Option<FileMetaExt>,

Review Comment:
   I wonder if `Arc<dyn std::any::Any>` might be better? And just call it 
`extensions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to