init-js opened a new issue, #15837:
URL: https://github.com/apache/datafusion/issues/15837

   ### Describe the bug
   
   Our goal is to take an existing `DataFrame` and change the parquet field ids 
(after the fact) of its schema. The function `Projection::new_from_schema` 
looks promising, in that it seems to be in a position to update the output 
schema. 
   
   However, after executing the plan, and collecting the record batches, we 
notice that the field metadata on our output schema mathches the original field 
ids, rather than the field metadata we assigned in the outer Projection.
   
   Should this be this a supported operation?
   
   We also considered writing the dataframe to another table (with a different 
schema) as an alternative, but we would like to avoid additional copies. All we 
want to change are the field ids.
   
   Some code snippet to illustrate how we wrap the DataFrame in another 
LogicalPlan:
   
   
   ```
   fn with_field_ids(original_df: DataFrame, mapping: HashMap<String, u64>) -> 
Result<DataFrame> {
   
       let (session_state, input_plan) = original_df.into_parts();
   
       // this transformation is elided, but all we do is copy all the fields 
over to a new schema,
       // and we assign new values for the field metadata key 
`PARQUET_FIELD_ID_META_KEY`.
       let remapped_schema: Schema = 
modify_field_ids(input_plan.schema().as_arrow().clone(), ...)?;
   
       // the rest below is "wrapping" the original dataframe into a new 
projection.
       let remapped_df_schema: DFSchema = remapped_schema.to_dfschema()?
          
.with_functional_dependencies(input_plan.schema().functional_dependencies().clone())?;
   
       // wrap the plan into another logical plan that applies our modified 
schema.
       let output_plan = LogicalPlan::Projection(Projection::new_from_schema(
               Arc::new(input_plan),
               Arc::new(remapped_df_schema),
       ));
   
       Ok(DataFrame::new(session_state, output_plan))
   }
   ```
   
   
   ### To Reproduce
   
   I'm outputting a debug print of the resulting plan.
   
   We apply our transformation, execute the plan (with collect()) and then 
compare the 
   metadata in the output schema with what we expect. What we find instead is 
the
   original schema.
   
   ```
   ... // elided. We register a table from a parquet file with a column 
"bucket", with field id 100
   
   let df = ctx.sql("SELECT bucket FROM mytable").await.unwrap();
   
   let mut remap: HashMap<String, u64> = HashMap::new();
   remap.insert(BUCKET.to_string(), 111);
   
   // this consumes the original dataframe and wraps it in a new schema 
projection
   // where the "bucket" field id has been set to 111.
   let df_remapped = with_field_ids(df, &remap).expect("nothing to remap");
   
   // shown below in ticket
   dbg!("OUTPUT PLAN", &df_remapped.logical_plan());
   
   let x = df_remapped.collect().await.expect("should not fail");
   let batch = x.get(0).unwrap();
   let output_schema = batch.schema();
   
   // shown below in ticket
   dbg!("OUTPUT_SCHEMA", &output_schema);
   
   // extract the id from the parquet metadata. 
   
   let parq_schema_descriptor =
               arrow_to_parquet_schema(&output_schema).expect("should be able 
to get descriptor");
   let parq_fields = parq_schema_descriptor.root_schema().get_fields();
   assert_eq!(parq_fields.len(), 1);
   
   // this fails. the original id comes to the surface. our field id from the 
outermost projection is ignored.
   assert_eq!(parq_fields[0].get_basic_info().id(), 111); 
   ```
   
   ### Expected behavior
   
   We would expect the output schema when we execute the dataframe's plan to 
reflect the "outermost" projection's schema and have "PARQUET:field_id": "111".
   
   But the inner plan's schema is used instead.
   
   ### Additional context
   
   Note that the outer logical plan's schema mapping has a field id of 111: 
`"PARQUET:field_id": "111",` which is the desired output we want.
   
   And the inner logical plan's schema has the original field id: 
`"PARQUET:field_id": "100",`. 
   
   ```
   [src/schema/mod.rs:599:9] &df_remapped.logical_plan() = Projection(
       Projection {
           expr: [
               Column(
                   Column {
                       relation: None,
                       name: "bucket",
                   },
               ),
           ],
           input: Projection(
               Projection {
                   expr: [
                       Column(
                           Column {
                               relation: Some(
                                   Bare {
                                       table: "mytable",
                                   },
                               ),
                               name: "bucket",
                           },
                       ),
                   ],
                   input: TableScan(
                       TableScan {
                           table_name: Bare {
                               table: "mytable",
                           },
                           source: "...",
                           projection: None,
                           projected_schema: DFSchema {
                               inner: Schema {
                                   fields: [
                                       Field {
                                           name: "bucket",
                                           data_type: Utf8,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {
                                               "PARQUET:field_id": "100",
                                           },
                                       },
                                       Field {
                                           name: "key",
                                           data_type: Utf8,
                                           nullable: false,
                                           dict_id: 0,
                                           dict_is_ordered: false,
                                           metadata: {
                                               "PARQUET:field_id": "200",
                                           },
                                       },
                                   ],
                                   metadata: {},
                               },
                               field_qualifiers: [
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                                   Some(
                                       Bare {
                                           table: "mytable",
                                       },
                                   ),
                               ],
                               functional_dependencies: FunctionalDependencies {
                                   deps: [],
                               },
                           },
                           filters: [],
                           fetch: None,
                           ..
                       },
                   ),
                   schema: DFSchema {
                       inner: Schema {
                           fields: [
                               Field {
                                   name: "bucket",
                                   data_type: Utf8,
                                   nullable: false,
                                   dict_id: 0,
                                   dict_is_ordered: false,
                                   metadata: {
                                       "PARQUET:field_id": "100",
                                   },
                               },
                           ],
                           metadata: {},
                       },
                       field_qualifiers: [
                           Some(
                               Bare {
                                   table: "mytable",
                               },
                           ),
                       ],
                       functional_dependencies: FunctionalDependencies {
                           deps: [],
                       },
                   },
               },
           ),
           schema: DFSchema {
               inner: Schema {
                   fields: [
                       Field {
                           name: "bucket",
                           data_type: Utf8,
                           nullable: false,
                           dict_id: 0,
                           dict_is_ordered: false,
                           metadata: {
                               "PARQUET:field_id": "111",
                           },
                       },
                   ],
                   metadata: {},
               },
               field_qualifiers: [
                   None,
               ],
               functional_dependencies: FunctionalDependencies {
                   deps: [],
               },
           },
       },
   )
   ```
   
   And the output schema of the record batches, after we collect() the 
DataFrame, shows the innermost metadata, not the outer projection:
   
   ```
   [src/schema/mod.rs:604:9] &output_schema = Schema {
       fields: [
           Field {
               name: "bucket",
               data_type: Utf8,
               nullable: false,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {
                   "PARQUET:field_id": "100",
               },
           },
       ],
       metadata: {},
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to