alamb commented on code in PR #19111:
URL: https://github.com/apache/datafusion/pull/19111#discussion_r2607020890


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -157,29 +157,39 @@ impl FileOpener for ParquetOpener {
         // there are other cases where partition columns may appear in more
         // complex predicates that cannot be simplified until we are about to
         // open the file (such as dynamic predicates)
-        let mut predicate = {
-            // Only replace partition columns if there are partition values.
-            // For non-partitioned tables, skip the clone and replacement 
traversal.
-            if partitioned_file.partition_values.is_empty() {
-                self.predicate.clone()
-            } else {
-                let partition_values: HashMap<&str, &ScalarValue> = self
-                    .table_schema
-                    .table_partition_cols()
-                    .iter()
-                    .zip(partitioned_file.partition_values.iter())
-                    .map(|(field, value)| (field.name().as_str(), value))
-                    .collect();
-
-                self.predicate
-                    .clone()
-                    .map(|p| replace_columns_with_literals(p, 
&partition_values))
-                    .transpose()?
-            }
-        };
+        let partition_values: HashMap<&str, &ScalarValue> = self
+            .table_schema
+            .table_partition_cols()
+            .iter()
+            .zip(partitioned_file.partition_values.iter())
+            .map(|(field, value)| (field.name().as_str(), value))
+            .collect();
+
+        // Calculate the output schema from the original projection (before 
literal replacement)
+        // so we get correct field names from column references
+        let logical_file_schema = Arc::clone(self.table_schema.file_schema());
+        let output_schema = Arc::new(
+            self.projection
+                .project_schema(self.table_schema.table_schema())?,
+        );
 
-        let projection = Arc::clone(&self.projection);
-        let logical_file_schema = Arc::clone(self.table_schema.table_schema());
+        // Apply partition column replacement to projection expressions
+        let mut projection = self.projection.clone();

Review Comment:
   I found this code now very nice and easy to follow



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -517,13 +518,34 @@ impl FileOpener for ParquetOpener {
             let projector = projection.make_projector(&stream_schema)?;
 
             let stream = stream.map_err(DataFusionError::from).map(move |b| {
-                b.and_then(|b| {
+                b.and_then(|mut b| {
                     copy_arrow_reader_metrics(
                         &arrow_reader_metrics,
                         &predicate_cache_inner_records,
                         &predicate_cache_records,
                     );
-                    projector.project_batch(&b)
+                    b = projector.project_batch(&b)?;
+                    if replace_schema {
+                        // Ensure the output batch has the expected schema.

Review Comment:
   Thank you for the comments -- I see now that this is similar, but not the 
same, as what SchemaAdapter did. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to