This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bbd1f0486 feat: Allow creating a ValuesExec from record batches 
(#7444)
3bbd1f0486 is described below

commit 3bbd1f04860e256cb5db407ec0d279ff202cd65b
Author: Sean Smith <[email protected]>
AuthorDate: Wed Sep 6 10:39:56 2023 -0500

    feat: Allow creating a ValuesExec from record batches (#7444)
---
 datafusion/core/src/physical_plan/values.rs | 69 ++++++++++++++++++++++++++++-
 1 file changed, 68 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_plan/values.rs 
b/datafusion/core/src/physical_plan/values.rs
index ff5e719690..539a88a9d5 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -88,8 +88,35 @@ impl ValuesExec {
         Ok(Self { schema, data })
     }
 
+    /// Create a new plan using the provided schema and batches.
+    ///
+    /// Errors if any of the batches don't match the provided schema, or if no
+    /// batches are provided.
+    pub fn try_new_from_batches(
+        schema: SchemaRef,
+        batches: Vec<RecordBatch>,
+    ) -> Result<Self> {
+        if batches.is_empty() {
+            return plan_err!("Values list cannot be empty");
+        }
+
+        for batch in &batches {
+            let batch_schema = batch.schema();
+            if batch_schema != schema {
+                return plan_err!(
+                    "Batch has invalid schema. Expected: {schema}, got: 
{batch_schema}"
+                );
+            }
+        }
+
+        Ok(ValuesExec {
+            schema,
+            data: batches,
+        })
+    }
+
     /// provides the data
-    fn data(&self) -> Vec<RecordBatch> {
+    pub fn data(&self) -> Vec<RecordBatch> {
         self.data.clone()
     }
 }
@@ -168,7 +195,10 @@ impl ExecutionPlan for ValuesExec {
 #[cfg(test)]
 mod tests {
     use super::*;
+
+    use crate::test::create_vec_batches;
     use crate::test_util;
+    use arrow_schema::{DataType, Field, Schema};
 
     #[tokio::test]
     async fn values_empty_case() -> Result<()> {
@@ -177,4 +207,41 @@ mod tests {
         assert!(empty.is_err());
         Ok(())
     }
+
+    #[test]
+    fn new_exec_with_batches() {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col0",
+            DataType::UInt32,
+            false,
+        )]));
+        let batches = create_vec_batches(&schema, 10);
+        let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
+    }
+
+    #[test]
+    fn new_exec_with_batches_empty() {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col0",
+            DataType::UInt32,
+            false,
+        )]));
+        let _ = ValuesExec::try_new_from_batches(schema, 
Vec::new()).unwrap_err();
+    }
+
+    #[test]
+    fn new_exec_with_batches_invalid_schema() {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col0",
+            DataType::UInt32,
+            false,
+        )]));
+        let batches = create_vec_batches(&schema, 10);
+
+        let invalid_schema = Arc::new(Schema::new(vec![
+            Field::new("col0", DataType::UInt32, false),
+            Field::new("col1", DataType::Utf8, false),
+        ]));
+        let _ = ValuesExec::try_new_from_batches(invalid_schema, 
batches).unwrap_err();
+    }
 }

Reply via email to