This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c92d5db Metadata is kept in projections for non-derived columns
(#1378)
c92d5db is described below
commit c92d5db8f6277a4a56422f6d9cdff2bc5b2a704f
Author: Stephen Carman <[email protected]>
AuthorDate: Thu Dec 9 10:18:49 2021 -0500
Metadata is kept in projections for non-derived columns (#1378)
* Metadata is kept in projections for non-derived columns
* Schema carries over it's metadata now as well
* Propagate errors properly instead of unwraps
* cargo fmt and fix a lint
* revert testing changes (I hope)
Co-authored-by: Stephen Carman <[email protected]>
---
datafusion/src/physical_plan/projection.rs | 26 +++++++++++++++++---------
datafusion/src/test_util.rs | 13 ++++++++++---
2 files changed, 27 insertions(+), 12 deletions(-)
diff --git a/datafusion/src/physical_plan/projection.rs
b/datafusion/src/physical_plan/projection.rs
index eb0c4b8..e2be2a0 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -37,7 +37,6 @@ use super::expressions::Column;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use async_trait::async_trait;
-
use futures::stream::Stream;
use futures::stream::StreamExt;
@@ -62,18 +61,22 @@ impl ProjectionExec {
) -> Result<Self> {
let input_schema = input.schema();
- let fields: Result<Vec<_>> = expr
+ let fields: Result<Vec<Field>> = expr
.iter()
- .map(|(e, name)| {
- Ok(Field::new(
- name,
- e.data_type(&input_schema)?,
- e.nullable(&input_schema)?,
- ))
+ .map(|(e, name)| match input_schema.field_with_name(name) {
+ Ok(f) => Ok(f.clone()),
+ Err(_) => {
+ let dt = e.data_type(&input_schema)?;
+ let nullable = e.nullable(&input_schema)?;
+ Ok(Field::new(name, dt, nullable))
+ }
})
.collect();
- let schema = Arc::new(Schema::new(fields?));
+ let schema = Arc::new(Schema::new_with_metadata(
+ fields?,
+ input_schema.metadata().clone(),
+ ));
Ok(Self {
expr,
@@ -296,6 +299,11 @@ mod tests {
Arc::new(csv),
)?;
+ let col_field = projection.schema.field(0);
+ let col_metadata = col_field.metadata().clone().unwrap().clone();
+ let data: &str = &col_metadata["testing"];
+ assert_eq!(data, "test");
+
let mut partition_count = 0;
let mut row_count = 0;
for partition in 0..projection.output_partitioning().partition_count()
{
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index 8548c62..f1fb4db 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -17,6 +17,7 @@
//! Utility functions to make testing DataFusion based crates easier
+use std::collections::BTreeMap;
use std::{env, error::Error, path::PathBuf, sync::Arc};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -229,8 +230,12 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) ->
Result<PathBuf, Box<dyn
/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Utf8, false),
+ let mut f1 = Field::new("c1", DataType::Utf8, false);
+ f1.set_metadata(Some(BTreeMap::from_iter(
+ vec![("testing".into(), "test".into())].into_iter(),
+ )));
+ let schema = Schema::new(vec![
+ f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
@@ -243,7 +248,9 @@ pub fn aggr_test_schema() -> SchemaRef {
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
- ]))
+ ]);
+
+ Arc::new(schema)
}
#[cfg(test)]