avantgardnerio commented on code in PR #16985:
URL: https://github.com/apache/datafusion/pull/16985#discussion_r3196621898


##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -82,31 +85,94 @@ impl UnnestExec {
         struct_column_indices: Vec<usize>,
         schema: SchemaRef,
         options: UnnestOptions,
-    ) -> Self {
-        let cache = Self::compute_properties(&input, Arc::clone(&schema));
+    ) -> Result<Self> {
+        let cache = Self::compute_properties(
+            &input,
+            &list_column_indices,
+            &struct_column_indices,
+            Arc::clone(&schema),
+        )?;
 
-        UnnestExec {
+        Ok(UnnestExec {
             input,
             schema,
             list_column_indices,
             struct_column_indices,
             options,
             metrics: Default::default(),
             cache,
-        }
+        })
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
+        list_column_indices: &[ListUnnest],
+        struct_column_indices: &[usize],
         schema: SchemaRef,
-    ) -> PlanProperties {
-        PlanProperties::new(
-            EquivalenceProperties::new(schema),
-            input.output_partitioning().to_owned(),
+    ) -> Result<PlanProperties> {
+        // Find out which indices are not unnested, such that they can be 
copied over from the input plan
+        let input_schema = input.schema();
+        let mut unnested_indices = 
BooleanBufferBuilder::new(input_schema.fields().len());
+        unnested_indices.append_n(input_schema.fields().len(), false);
+        for list_unnest in list_column_indices {
+            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
+        }
+        for struct_unnest in struct_column_indices {
+            unnested_indices.set_bit(*struct_unnest, true)
+        }
+        let unnested_indices = unnested_indices.finish();
+        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
+            .filter(|idx| !unnested_indices.value(*idx))
+            .collect();
+
+        // Manually build projection mapping from non-unnested input columns 
to their positions in the output
+        let input_schema = input.schema();
+        let projection_mapping: ProjectionMapping = non_unnested_indices
+            .iter()
+            .map(|&input_idx| {
+                // Find what index the input column has in the output schema
+                let input_field = input_schema.field(input_idx);
+                let output_idx = schema
+                    .fields()
+                    .iter()
+                    .position(|output_field| output_field.name() == 
input_field.name())
+                    .ok_or_else(|| {
+                        exec_datafusion_err!(
+                            "Non-unnested column '{}' must exist in output 
schema",

Review Comment:
   This has the unfortunate side-effect of breaking any `transform_up` that is 
walking the tree trying to add new fields to the schema.
   
   I'm not sure what the work around is... a custom `transform_up` that checks 
for UnnestExec and adds any fields from the new child that are missing from 
self.schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to