This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec18bb  Consolidate Schema and RecordBatch projection (#1638)
6ec18bb is described below

commit 6ec18bb4a53f684efd8d97443c55035eb37bda14
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Jan 23 14:14:04 2022 -0500

    Consolidate Schema and RecordBatch projection (#1638)
---
 ballista/rust/core/src/memory_stream.rs    | 13 +++++-----
 datafusion/src/datasource/empty.rs         | 14 +++-------
 datafusion/src/datasource/listing/table.rs |  9 ++-----
 datafusion/src/datasource/memory.rs        | 10 +++++---
 datafusion/src/physical_plan/memory.rs     | 41 +++++++++---------------------
 datafusion/src/physical_plan/mod.rs        | 40 +++++++++++++++++++++++++++++
 datafusion/tests/custom_sources.rs         |  9 ++-----
 datafusion/tests/statistics.rs             | 18 ++++---------
 8 files changed, 77 insertions(+), 77 deletions(-)

diff --git a/ballista/rust/core/src/memory_stream.rs 
b/ballista/rust/core/src/memory_stream.rs
index ab72bdc..0c0ba4b 100644
--- a/ballista/rust/core/src/memory_stream.rs
+++ b/ballista/rust/core/src/memory_stream.rs
@@ -67,13 +67,12 @@ impl Stream for MemoryStream {
             let batch = &self.data[self.index - 1];
 
             // apply projection
-            match &self.projection {
-                Some(columns) => Some(RecordBatch::try_new(
-                    self.schema.clone(),
-                    columns.iter().map(|i| batch.column(*i).clone()).collect(),
-                )),
-                None => Some(Ok(batch.clone())),
-            }
+            let next_batch = match &self.projection {
+                Some(projection) => batch.project(projection)?,
+                None => batch.clone(),
+            };
+
+            Some(Ok(next_batch))
         } else {
             None
         })
diff --git a/datafusion/src/datasource/empty.rs 
b/datafusion/src/datasource/empty.rs
index 9665605..5622d15 100644
--- a/datafusion/src/datasource/empty.rs
+++ b/datafusion/src/datasource/empty.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
 use crate::datasource::TableProvider;
 use crate::error::Result;
 use crate::logical_plan::Expr;
+use crate::physical_plan::project_schema;
 use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};
 
 /// A table with a schema but no data.
@@ -57,16 +58,7 @@ impl TableProvider for EmptyTable {
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // even though there is no data, projections apply
-        let projection = match projection.clone() {
-            Some(p) => p,
-            None => (0..self.schema.fields().len()).collect(),
-        };
-        let projected_schema = Schema::new(
-            projection
-                .iter()
-                .map(|i| self.schema.field(*i).clone())
-                .collect(),
-        );
-        Ok(Arc::new(EmptyExec::new(false, Arc::new(projected_schema))))
+        let projected_schema = project_schema(&self.schema, 
projection.as_ref())?;
+        Ok(Arc::new(EmptyExec::new(false, projected_schema)))
     }
 }
diff --git a/datafusion/src/datasource/listing/table.rs 
b/datafusion/src/datasource/listing/table.rs
index ff6d322..2f8f70f 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -29,7 +29,7 @@ use crate::{
     physical_plan::{
         empty::EmptyExec,
         file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
-        ExecutionPlan, Statistics,
+        project_schema, ExecutionPlan, Statistics,
     },
 };
 
@@ -179,12 +179,7 @@ impl TableProvider for ListingTable {
         // if no files need to be read, return an `EmptyExec`
         if partitioned_file_lists.is_empty() {
             let schema = self.schema();
-            let projected_schema = match &projection {
-                None => schema,
-                Some(p) => Arc::new(Schema::new(
-                    p.iter().map(|i| schema.field(*i).clone()).collect(),
-                )),
-            };
+            let projected_schema = project_schema(&schema, 
projection.as_ref())?;
             return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
         }
 
diff --git a/datafusion/src/datasource/memory.rs 
b/datafusion/src/datasource/memory.rs
index c732b17..5fad702 100644
--- a/datafusion/src/datasource/memory.rs
+++ b/datafusion/src/datasource/memory.rs
@@ -147,6 +147,7 @@ mod tests {
     use crate::from_slice::FromSlice;
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::error::ArrowError;
     use futures::StreamExt;
     use std::collections::HashMap;
 
@@ -235,10 +236,13 @@ mod tests {
         let projection: Vec<usize> = vec![0, 4];
 
         match provider.scan(&Some(projection), &[], None).await {
-            Err(DataFusionError::Internal(e)) => {
-                assert_eq!("\"Projection index out of range\"", 
format!("{:?}", e))
+            Err(DataFusionError::ArrowError(ArrowError::SchemaError(e))) => {
+                assert_eq!(
+                    "\"project index 4 out of bounds, max field 3\"",
+                    format!("{:?}", e)
+                )
             }
-            _ => panic!("Scan should failed on invalid projection"),
+            res => panic!("Scan should failed on invalid projection, got 
{:?}", res),
         };
 
         Ok(())
diff --git a/datafusion/src/physical_plan/memory.rs 
b/datafusion/src/physical_plan/memory.rs
index 61be207..8e32b09 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -23,11 +23,11 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use super::{
-    common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use crate::error::{DataFusionError, Result};
-use arrow::datatypes::{Field, Schema, SchemaRef};
+use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
@@ -136,24 +136,7 @@ impl MemoryExec {
         schema: SchemaRef,
         projection: Option<Vec<usize>>,
     ) -> Result<Self> {
-        let projected_schema = match &projection {
-            Some(columns) => {
-                let fields: Result<Vec<Field>> = columns
-                    .iter()
-                    .map(|i| {
-                        if *i < schema.fields().len() {
-                            Ok(schema.field(*i).clone())
-                        } else {
-                            Err(DataFusionError::Internal(
-                                "Projection index out of range".to_string(),
-                            ))
-                        }
-                    })
-                    .collect();
-                Arc::new(Schema::new(fields?))
-            }
-            None => Arc::clone(&schema),
-        };
+        let projected_schema = project_schema(&schema, projection.as_ref())?;
         Ok(Self {
             partitions: partitions.to_vec(),
             schema,
@@ -201,14 +184,14 @@ impl Stream for MemoryStream {
         Poll::Ready(if self.index < self.data.len() {
             self.index += 1;
             let batch = &self.data[self.index - 1];
-            // apply projection
-            match &self.projection {
-                Some(columns) => Some(RecordBatch::try_new(
-                    self.schema.clone(),
-                    columns.iter().map(|i| batch.column(*i).clone()).collect(),
-                )),
-                None => Some(Ok(batch.clone())),
-            }
+
+            // return just the columns requested
+            let batch = match self.projection.as_ref() {
+                Some(columns) => batch.project(columns)?,
+                None => batch.clone(),
+            };
+
+            Some(Ok(batch))
         } else {
             None
         })
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index ce12722..216d4a6 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -586,6 +586,46 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+/// Applies an optional projection to a [`SchemaRef`], returning the
+/// projected schema
+///
+/// Example:
+/// ```
+/// use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+/// use datafusion::physical_plan::project_schema;
+///
+/// // Schema with columns 'a', 'b', and 'c'
+/// let schema = SchemaRef::new(Schema::new(vec![
+///   Field::new("a", DataType::Int32, true),
+///   Field::new("b", DataType::Int64, true),
+///   Field::new("c", DataType::Utf8, true),
+/// ]));
+///
+/// // Pick columns 'c' and 'b'
+/// let projection = Some(vec![2,1]);
+/// let projected_schema = project_schema(
+///    &schema,
+///    projection.as_ref()
+///  ).unwrap();
+///
+/// let expected_schema = SchemaRef::new(Schema::new(vec![
+///   Field::new("c", DataType::Utf8, true),
+///   Field::new("b", DataType::Int64, true),
+/// ]));
+///
+/// assert_eq!(projected_schema, expected_schema);
+/// ```
+pub fn project_schema(
+    schema: &SchemaRef,
+    projection: Option<&Vec<usize>>,
+) -> Result<SchemaRef> {
+    let schema = match projection {
+        Some(columns) => Arc::new(schema.project(columns)?),
+        None => Arc::clone(schema),
+    };
+    Ok(schema)
+}
+
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
diff --git a/datafusion/tests/custom_sources.rs 
b/datafusion/tests/custom_sources.rs
index c2511ba..e069dd7 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -34,7 +34,7 @@ use datafusion::logical_plan::{
     col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
 };
 use datafusion::physical_plan::{
-    ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream,
+    project_schema, ColumnStatistics, ExecutionPlan, Partitioning, 
RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
 
@@ -108,12 +108,7 @@ impl ExecutionPlan for CustomExecutionPlan {
     }
     fn schema(&self) -> SchemaRef {
         let schema = TEST_CUSTOM_SCHEMA_REF!();
-        match &self.projection {
-            None => schema,
-            Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
-            )),
-        }
+        project_schema(&schema, self.projection.as_ref()).expect("projected 
schema")
     }
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs
index 4964baf..3bc3720 100644
--- a/datafusion/tests/statistics.rs
+++ b/datafusion/tests/statistics.rs
@@ -25,7 +25,7 @@ use datafusion::{
     error::{DataFusionError, Result},
     logical_plan::Expr,
     physical_plan::{
-        ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
+        project_schema, ColumnStatistics, DisplayFormatType, ExecutionPlan, 
Partitioning,
         SendableRecordBatchStream, Statistics,
     },
     prelude::ExecutionContext,
@@ -44,7 +44,7 @@ struct StatisticsValidation {
 }
 
 impl StatisticsValidation {
-    fn new(stats: Statistics, schema: Schema) -> Self {
+    fn new(stats: Statistics, schema: SchemaRef) -> Self {
         assert!(
             stats
                 .column_statistics
@@ -53,10 +53,7 @@ impl StatisticsValidation {
                 .unwrap_or(true),
             "if defined, the column statistics vector length should be the 
number of fields"
         );
-        Self {
-            stats,
-            schema: Arc::new(schema),
-        }
+        Self { stats, schema }
     }
 }
 
@@ -87,12 +84,7 @@ impl TableProvider for StatisticsValidation {
             Some(p) => p,
             None => (0..self.schema.fields().len()).collect(),
         };
-        let projected_schema = Schema::new(
-            projection
-                .iter()
-                .map(|i| self.schema.field(*i).clone())
-                .collect(),
-        );
+        let projected_schema = project_schema(&self.schema, 
Some(&projection))?;
 
         let current_stat = self.stats.clone();
 
@@ -177,7 +169,7 @@ impl ExecutionPlan for StatisticsValidation {
 fn init_ctx(stats: Statistics, schema: Schema) -> Result<ExecutionContext> {
     let mut ctx = ExecutionContext::new();
     let provider: Arc<dyn TableProvider> =
-        Arc::new(StatisticsValidation::new(stats, schema));
+        Arc::new(StatisticsValidation::new(stats, Arc::new(schema)));
     ctx.register_table("stats_table", provider)?;
     Ok(ctx)
 }

Reply via email to