This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c92d5db  Metadata is kept in projections for non-derived columns 
(#1378)
c92d5db is described below

commit c92d5db8f6277a4a56422f6d9cdff2bc5b2a704f
Author: Stephen Carman <[email protected]>
AuthorDate: Thu Dec 9 10:18:49 2021 -0500

    Metadata is kept in projections for non-derived columns (#1378)
    
    * Metadata is kept in projections for non-derived columns
    
    * Schema carries over it's metadata now as well
    
    * Propagate errors properly instead of unwraps
    
    * cargo fmt and fix a lint
    
    * revert testing changes (I hope)
    
    Co-authored-by: Stephen Carman <[email protected]>
---
 datafusion/src/physical_plan/projection.rs | 26 +++++++++++++++++---------
 datafusion/src/test_util.rs                | 13 ++++++++++---
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/datafusion/src/physical_plan/projection.rs 
b/datafusion/src/physical_plan/projection.rs
index eb0c4b8..e2be2a0 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -37,7 +37,6 @@ use super::expressions::Column;
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use async_trait::async_trait;
-
 use futures::stream::Stream;
 use futures::stream::StreamExt;
 
@@ -62,18 +61,22 @@ impl ProjectionExec {
     ) -> Result<Self> {
         let input_schema = input.schema();
 
-        let fields: Result<Vec<_>> = expr
+        let fields: Result<Vec<Field>> = expr
             .iter()
-            .map(|(e, name)| {
-                Ok(Field::new(
-                    name,
-                    e.data_type(&input_schema)?,
-                    e.nullable(&input_schema)?,
-                ))
+            .map(|(e, name)| match input_schema.field_with_name(name) {
+                Ok(f) => Ok(f.clone()),
+                Err(_) => {
+                    let dt = e.data_type(&input_schema)?;
+                    let nullable = e.nullable(&input_schema)?;
+                    Ok(Field::new(name, dt, nullable))
+                }
             })
             .collect();
 
-        let schema = Arc::new(Schema::new(fields?));
+        let schema = Arc::new(Schema::new_with_metadata(
+            fields?,
+            input_schema.metadata().clone(),
+        ));
 
         Ok(Self {
             expr,
@@ -296,6 +299,11 @@ mod tests {
             Arc::new(csv),
         )?;
 
+        let col_field = projection.schema.field(0);
+        let col_metadata = col_field.metadata().clone().unwrap().clone();
+        let data: &str = &col_metadata["testing"];
+        assert_eq!(data, "test");
+
         let mut partition_count = 0;
         let mut row_count = 0;
         for partition in 0..projection.output_partitioning().partition_count() 
{
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index 8548c62..f1fb4db 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -17,6 +17,7 @@
 
 //! Utility functions to make testing DataFusion based crates easier
 
+use std::collections::BTreeMap;
 use std::{env, error::Error, path::PathBuf, sync::Arc};
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -229,8 +230,12 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> 
Result<PathBuf, Box<dyn
 
 /// Get the schema for the aggregate_test_* csv files
 pub fn aggr_test_schema() -> SchemaRef {
-    Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
+    let mut f1 = Field::new("c1", DataType::Utf8, false);
+    f1.set_metadata(Some(BTreeMap::from_iter(
+        vec![("testing".into(), "test".into())].into_iter(),
+    )));
+    let schema = Schema::new(vec![
+        f1,
         Field::new("c2", DataType::UInt32, false),
         Field::new("c3", DataType::Int8, false),
         Field::new("c4", DataType::Int16, false),
@@ -243,7 +248,9 @@ pub fn aggr_test_schema() -> SchemaRef {
         Field::new("c11", DataType::Float32, false),
         Field::new("c12", DataType::Float64, false),
         Field::new("c13", DataType::Utf8, false),
-    ]))
+    ]);
+
+    Arc::new(schema)
 }
 
 #[cfg(test)]

Reply via email to