adriangb commented on code in PR #18627:
URL: https://github.com/apache/datafusion/pull/18627#discussion_r2529214447


##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -208,14 +208,21 @@ impl FileFormat for ArrowFormat {
             conf.table_partition_cols().clone(),
         );
 
-        let source: Arc<dyn FileSource> =
+        let mut source: Arc<dyn FileSource> =
             match is_object_in_arrow_ipc_file_format(object_store, 
object_location).await
             {
                 Ok(true) => 
Arc::new(ArrowSource::new_file_source(table_schema)),
                 Ok(false) => 
Arc::new(ArrowSource::new_stream_file_source(table_schema)),
                 Err(e) => Err(e)?,
             };
 
+        // Preserve projection from the original file source
+        if let Some(projection) = conf.file_source.projection() {
+            if let Some(new_source) = 
source.try_pushdown_projection(projection)? {
+                source = new_source;
+            }
+        }

Review Comment:
   I want to double check this bit of code. Do we also need to preserve the 
filters? What *is* the original source / why are we recreating it?



##########
datafusion/core/tests/physical_optimizer/partition_statistics.rs:
##########
@@ -620,7 +620,7 @@ mod test {
         let plan_string = 
get_plan_string(&aggregate_exec_partial).swap_remove(0);
         assert_snapshot!(
             plan_string,
-            @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], 
aggr=[COUNT(c)]"
+            @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], 
aggr=[COUNT(c)], ordering_mode=Sorted"

Review Comment:
   I'm actually not sure where this is coming from... we need to double check 
it's correct / an improvement



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -423,10 +403,13 @@ impl FileScanConfigBuilder {
     ///
     /// This method takes ownership of the builder and returns the constructed 
`FileScanConfig`.
     /// Any unset optional fields will use their default values.
-    pub fn build(self) -> FileScanConfig {
+    ///
+    /// # Errors
+    /// Returns an error if projection pushdown fails or if schema operations 
fail.
+    pub fn build(self) -> Result<FileScanConfig> {

Review Comment:
   Converting this to return Result to properly handle when a FileSource cannot 
accept any projections. Previously this would have been a silent but that I 
think was just never hit because all of our FileSources accepted projections



##########
datafusion/datasource-json/src/source.rs:
##########
@@ -130,8 +149,20 @@ impl FileSource for JsonSource {
         Arc::new(conf)
     }
 
-    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
-        Arc::new(Self { ..self.clone() })

Review Comment:
   It's unclear to me if this contains bugs or not. It seems like the 
projection was silently dropped on the floor...



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -77,14 +74,6 @@ impl FileStream {
         metrics: &ExecutionPlanMetricsSet,
     ) -> Result<Self> {
         let projected_schema = config.projected_schema();
-        let pc_projector = PartitionColumnProjector::new(

Review Comment:
   We completely nuke partition value handling here in favor of having the 
projection handle it.



##########
datafusion/datasource-avro/src/file_format.rs:
##########
@@ -154,11 +154,7 @@ impl FileFormat for AvroFormat {
         _state: &dyn Session,
         conf: FileScanConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let file_schema = Arc::clone(conf.file_schema());
-        let config = FileScanConfigBuilder::from(conf)
-            .with_source(Arc::new(AvroSource::new(file_schema)))
-            .build();
-        Ok(DataSourceExec::from_data_source(config))
+        Ok(DataSourceExec::from_data_source(conf))
     }

Review Comment:
   I'm quite perplexed about the purpose of this method. Should it always just 
return `Ok(DataSourceExec::from_data_source(conf))`? We already ahve a 
`FileScanConfig`...



##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -20,244 +20,59 @@
 //! # Naming Note
 //!
 //! The naming in this module can be confusing:
-//! - `ArrowFileSource` / `ArrowFileOpener` handle the Arrow IPC **file 
format**
+//! - `ArrowFileOpener` handles the Arrow IPC **file format**
 //!   (with footer, supports parallel reading)
-//! - `ArrowStreamFileSource` / `ArrowStreamFileOpener` handle the Arrow IPC 
**stream format**
+//! - `ArrowStreamFileOpener` handles the Arrow IPC **stream format**
 //!   (without footer, sequential only)
+//! - `ArrowSource` is the unified `FileSource` implementation that uses 
either opener
+//!   depending on the format specified at construction
 //!
-//! Despite the name "ArrowStreamFileSource", it still reads from files - the 
"Stream"
+//! Despite the name "ArrowStreamFileOpener", it still reads from files - the 
"Stream"
 //! refers to the Arrow IPC stream format, not streaming I/O. Both formats can 
be stored
 //! in files on disk or object storage.
 
 use std::sync::Arc;
 use std::{any::Any, io::Cursor};
 
-use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_datasource::schema_adapter::{
+    DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+};
 use datafusion_datasource::{as_file_source, TableSchema};
 
 use arrow::buffer::Buffer;
+use arrow::datatypes::SchemaRef;
 use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
 use datafusion_common::error::Result;
-use datafusion_common::{exec_datafusion_err, Statistics};
+use datafusion_common::exec_datafusion_err;
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
 use datafusion_datasource::PartitionedFile;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_plan::projection::ProjectionExprs;
 
 use datafusion_datasource::file_stream::FileOpenFuture;
 use datafusion_datasource::file_stream::FileOpener;
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
 
-/// `FileSource` for Arrow IPC file format. Supports range-based parallel 
reading.
-#[derive(Clone)]
-pub(crate) struct ArrowFileSource {
-    table_schema: TableSchema,
-    metrics: ExecutionPlanMetricsSet,
-    projected_statistics: Option<Statistics>,
-    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
-}

Review Comment:
   The idea here was that instead of having a dedicated FileSource for each one 
of Arrow File / Arrow Stream we can have a unified FileSource and two different 
openers. It's less code and complexity.



##########
datafusion/datasource/src/file.rs:
##########
@@ -66,18 +67,16 @@ pub trait FileSource: Send + Sync {
     fn table_schema(&self) -> &crate::table_schema::TableSchema;
     /// Initialize new type with batch size configuration
     fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
-    /// Initialize new instance with projection information
-    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
-    /// Initialize new instance with projected statistics
-    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;

Review Comment:
   `with_proejection` becomes `try_pushdown_projection` and statistics handling 
gets moved into `FileScanConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to