alamb commented on a change in pull request #1165:
URL: https://github.com/apache/arrow-datafusion/pull/1165#discussion_r734959718
##########
File path: datafusion/tests/sql.rs
##########
@@ -476,6 +476,180 @@ async fn csv_query_group_by_float32() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn select_values_list() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ {
+ let sql = "VALUES (1)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES ()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),(2)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "| 2 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,'a'),(2,'b')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| 2 | b |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),(1,2)";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),('2')";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),(2.0)";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,2), (1,'2')";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,'a'),(NULL,'b'),(3,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| | b |",
+ "| 3 | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (NULL,'a'),(NULL,'b'),(3,'c')";
Review comment:
LOL every case I could come up with to test you have already covered
##########
File path: datafusion/tests/sql.rs
##########
@@ -476,6 +476,180 @@ async fn csv_query_group_by_float32() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn select_values_list() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ {
+ let sql = "VALUES (1)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES ()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),(2)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "| 2 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,'a'),(2,'b')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| 2 | b |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),(1,2)";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),('2')";
Review comment:
👍 for testing the negative case
##########
File path: datafusion/src/optimizer/constant_folding.rs
##########
@@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::Values { .. }
Review comment:
I suspect it is not likely to matter, but constant folding could be
applied to the `Expr`s in `values`. As written this code will not apply
constant folding to those expressions
##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -702,6 +716,9 @@ impl LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f,
"EmptyRelation"),
+ LogicalPlan::Values { ref values, .. } => {
+ write!(f, "Values: {} rows", values.len())
+ }
Review comment:
What do you think about adding the values here, so that we can see the
values in an explain plan?
As is this PR makes like this:
```
> explain SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) as
t(num, letter);
+---------------+-----------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------+
| logical_plan | Projection: #t.num, #t.letter
|
| | Projection: #t.column1 AS num, #t.column2 AS letter,
alias=t |
| | Projection: #column1, #column2, alias=t
|
| | Values: 3 rows
|
| physical_plan | ProjectionExec: expr=[num@0 as num, letter@1 as letter]
|
| | ProjectionExec: expr=[column1@0 as num, column2@1 as
letter] |
| | ProjectionExec: expr=[column1@0 as column1, column2@1
as column2] |
| | RepartitionExec: partitioning=RoundRobinBatch(16)
|
| | ValuesExec
|
| |
|
+---------------+-----------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.
```
```suggestion
LogicalPlan::Values { ref values, .. } => {
let str_values : Vec<_> = values
.iter()
// limit to only 5 values to avoid horrible
display
.take(5)
.map(|row| {
let item = row.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({})", item)
})
.collect();
let elipse = if values.len() > 5 { "..." } else { ""
};
write!(f, "Values: {}{}", str_values.join(", "),
elipse)
}
```
The suggestion makes it like
```sql
> explain SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) as
t(num, letter);
+---------------+-------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------------------------+
| logical_plan | Projection: #t.num, #t.letter
|
| | Projection: #t.column1 AS num, #t.column2 AS letter,
alias=t |
| | Projection: #column1, #column2, alias=t
|
| | Values: (Int64(1), Utf8("one")), (Int64(2),
Utf8("two")), (Int64(3), Utf8("three")) |
| physical_plan | ProjectionExec: expr=[num@0 as num, letter@1 as letter]
|
| | ProjectionExec: expr=[column1@0 as num, column2@1 as
letter] |
| | ProjectionExec: expr=[column1@0 as column1, column2@1
as column2] |
| | RepartitionExec: partitioning=RoundRobinBatch(16)
|
| | ValuesExec
|
| |
|
+---------------+-------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.008 seconds.
```
Doing something similar for the physical plan would be cool too
##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -111,6 +111,79 @@ impl LogicalPlanBuilder {
})
}
+ /// Create a values list based relation, and the schema is inferred from
data. This will consume
+ /// and mut the given value vec.
Review comment:
```suggestion
/// Create a values list based relation, and the schema is inferred from
data, consuming
/// `value`. See the [Postgres
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
```
##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -203,6 +203,13 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
+ /// Values expression
Review comment:
```suggestion
/// Values expression. See
/// [Postgres
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
```
##########
File path: datafusion/src/optimizer/utils.rs
##########
@@ -124,6 +124,10 @@ pub fn from_plan(
schema: schema.clone(),
alias: alias.clone(),
}),
+ LogicalPlan::Values { schema, values } => Ok(LogicalPlan::Values {
+ schema: schema.clone(),
+ values: values.to_vec(),
Review comment:
I think the values here should be derived from `expr` - the various
optimizers call `from_plan` to create a new logical plan after potentially
rewriting expressions returned from `LogicalPlan::expressions`.
something like (untested)
```rust
values : exprs.windows(values[0].len()).map(|w| w.to_vec()).collect()
```
I'll add some comments to make that clearer
##########
File path: datafusion/src/physical_plan/values.rs
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Values execution plan
+
+use super::{common, SendableRecordBatchStream, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+ memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution,
ExecutionPlan,
+ Partitioning, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Execution plan for values list based relation (produces constant rows)
+#[derive(Debug)]
+pub struct ValuesExec {
+ /// The schema
+ schema: SchemaRef,
+ /// The data
+ data: Vec<RecordBatch>,
+}
+
+impl ValuesExec {
+ /// create a new values exec from data as expr
+ pub fn try_new(
+ schema: SchemaRef,
+ data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+ ) -> Result<Self> {
+ if data.is_empty() {
+ return Err(DataFusionError::Plan("Values list cannot be
empty".into()));
+ }
+ // we have this empty batch as a placeholder to satisfy evaluation
argument
Review comment:
I wonder what you think about moving the creation of the actual
`RecordBatch` from `ValuesExec::try_new` to `execute` -- the rationale would be
to make `PhysicalPlan` creation faster and push the actual work into `execute`
where if can potentially be run concurrently with other parts
Given the size of data in a `VALUES` statement, this is not likely to be any
real difference so I am fine with leaving the creation in the same place too --
I just wanted to mention it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]