This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3bbd1f0486 feat: Allow creating a ValuesExec from record batches
(#7444)
3bbd1f0486 is described below
commit 3bbd1f04860e256cb5db407ec0d279ff202cd65b
Author: Sean Smith <[email protected]>
AuthorDate: Wed Sep 6 10:39:56 2023 -0500
feat: Allow creating a ValuesExec from record batches (#7444)
---
datafusion/core/src/physical_plan/values.rs | 69 ++++++++++++++++++++++++++++-
1 file changed, 68 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_plan/values.rs
b/datafusion/core/src/physical_plan/values.rs
index ff5e719690..539a88a9d5 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -88,8 +88,35 @@ impl ValuesExec {
Ok(Self { schema, data })
}
+ /// Create a new plan using the provided schema and batches.
+ ///
+ /// Errors if any of the batches don't match the provided schema, or if no
+ /// batches are provided.
+ pub fn try_new_from_batches(
+ schema: SchemaRef,
+ batches: Vec<RecordBatch>,
+ ) -> Result<Self> {
+ if batches.is_empty() {
+ return plan_err!("Values list cannot be empty");
+ }
+
+ for batch in &batches {
+ let batch_schema = batch.schema();
+ if batch_schema != schema {
+ return plan_err!(
+ "Batch has invalid schema. Expected: {schema}, got:
{batch_schema}"
+ );
+ }
+ }
+
+ Ok(ValuesExec {
+ schema,
+ data: batches,
+ })
+ }
+
/// provides the data
- fn data(&self) -> Vec<RecordBatch> {
+ pub fn data(&self) -> Vec<RecordBatch> {
self.data.clone()
}
}
@@ -168,7 +195,10 @@ impl ExecutionPlan for ValuesExec {
#[cfg(test)]
mod tests {
use super::*;
+
+ use crate::test::create_vec_batches;
use crate::test_util;
+ use arrow_schema::{DataType, Field, Schema};
#[tokio::test]
async fn values_empty_case() -> Result<()> {
@@ -177,4 +207,41 @@ mod tests {
assert!(empty.is_err());
Ok(())
}
+
+ #[test]
+ fn new_exec_with_batches() {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "col0",
+ DataType::UInt32,
+ false,
+ )]));
+ let batches = create_vec_batches(&schema, 10);
+ let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
+ }
+
+ #[test]
+ fn new_exec_with_batches_empty() {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "col0",
+ DataType::UInt32,
+ false,
+ )]));
+ let _ = ValuesExec::try_new_from_batches(schema,
Vec::new()).unwrap_err();
+ }
+
+ #[test]
+ fn new_exec_with_batches_invalid_schema() {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "col0",
+ DataType::UInt32,
+ false,
+ )]));
+ let batches = create_vec_batches(&schema, 10);
+
+ let invalid_schema = Arc::new(Schema::new(vec![
+ Field::new("col0", DataType::UInt32, false),
+ Field::new("col1", DataType::Utf8, false),
+ ]));
+ let _ = ValuesExec::try_new_from_batches(invalid_schema,
batches).unwrap_err();
+ }
}