This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1b807  add values list expression (#1165)
3c1b807 is described below

commit 3c1b8079015e8171fa0db2476e1072084027256a
Author: Jiayu Liu <[email protected]>
AuthorDate: Sat Oct 23 21:10:53 2021 +0800

    add values list expression (#1165)
    
    * add values list expression
    
    * apply formatting
---
 ballista/rust/core/proto/ballista.proto            |  18 ++-
 .../rust/core/src/serde/logical_plan/from_proto.rs |  25 +++
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  20 +++
 datafusion/src/logical_plan/builder.rs             |  94 +++++++++--
 datafusion/src/logical_plan/plan.rs                |  35 ++++
 .../src/optimizer/common_subexpr_eliminate.rs      |   1 +
 datafusion/src/optimizer/constant_folding.rs       |   1 +
 datafusion/src/optimizer/projection_push_down.rs   |   1 +
 datafusion/src/optimizer/utils.rs                  |   7 +
 datafusion/src/physical_plan/mod.rs                |   1 +
 datafusion/src/physical_plan/planner.rs            |  26 ++-
 datafusion/src/physical_plan/values.rs             | 168 +++++++++++++++++++
 datafusion/src/sql/planner.rs                      |  32 +++-
 datafusion/tests/sql.rs                            | 178 ++++++++++++++++++++-
 14 files changed, 588 insertions(+), 19 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 338c5a6..95b78fc 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -255,6 +255,7 @@ message LogicalPlanNode {
     WindowNode window = 13;
     AnalyzeNode analyze = 14;
     CrossJoinNode cross_join = 15;
+    ValuesNode values = 16;
   }
 }
 
@@ -316,12 +317,12 @@ message SelectionNode {
   LogicalExprNode expr = 2;
 }
 
-message SortNode{
+message SortNode {
   LogicalPlanNode input = 1;
   repeated LogicalExprNode expr = 2;
 }
 
-message RepartitionNode{
+message RepartitionNode {
   LogicalPlanNode input = 1;
   oneof partition_method {
     uint64 round_robin = 2;
@@ -334,11 +335,11 @@ message HashRepartition {
   uint64 partition_count = 2;
 }
 
-message EmptyRelationNode{
+message EmptyRelationNode {
   bool produce_one_row = 1;
 }
 
-message CreateExternalTableNode{
+message CreateExternalTableNode {
   string name = 1;
   string location = 2;
   FileType file_type = 3;
@@ -346,7 +347,14 @@ message CreateExternalTableNode{
   DfSchema schema = 5;
 }
 
-enum FileType{
+// a node containing data for defining values list. unlike in SQL where it's 
two dimensional, here
+// the list is flattened, and with the field n_cols it can be parsed and 
partitioned into rows
+message ValuesNode {
+  uint64 n_cols = 1;
+  repeated LogicalExprNode values_list = 2;
+}
+
+enum FileType {
   NdJson = 0;
   Parquet = 1;
   CSV = 2;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 07eced7..26231c5 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -60,6 +60,31 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
             ))
         })?;
         match plan {
+            LogicalPlanType::Values(values) => {
+                let n_cols = values.n_cols as usize;
+                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
+                    Ok(Vec::new())
+                } else if values.values_list.len() % n_cols != 0 {
+                    Err(BallistaError::General(format!(
+                        "Invalid values list length, expect {} to be divisible 
by {}",
+                        values.values_list.len(),
+                        n_cols
+                    )))
+                } else {
+                    values
+                        .values_list
+                        .chunks_exact(n_cols)
+                        .map(|r| {
+                            r.iter()
+                                .map(|v| v.try_into())
+                                .collect::<Result<Vec<_>, _>>()
+                        })
+                        .collect::<Result<Vec<_>, _>>()
+                }?;
+                LogicalPlanBuilder::values(values)?
+                    .build()
+                    .map_err(|e| e.into())
+            }
             LogicalPlanType::Projection(projection) => {
                 let input: LogicalPlan = 
convert_box_required!(projection.input)?;
                 let x: Vec<Expr> = projection
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index e79e654..ae25d72 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -675,6 +675,26 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
     fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
         use protobuf::logical_plan_node::LogicalPlanType;
         match self {
+            LogicalPlan::Values { values, .. } => {
+                let n_cols = if values.is_empty() {
+                    0
+                } else {
+                    values[0].len()
+                } as u64;
+                let values_list = values
+                    .iter()
+                    .flatten()
+                    .map(|v| v.try_into())
+                    .collect::<Result<Vec<_>, _>>()?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Values(
+                        protobuf::ValuesNode {
+                            n_cols,
+                            values_list,
+                        },
+                    )),
+                })
+            }
             LogicalPlan::TableScan {
                 table_name,
                 source,
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 3a1d127..3c6c444 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -17,16 +17,6 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use std::{
-    collections::{HashMap, HashSet},
-    sync::Arc,
-};
-
-use arrow::{
-    datatypes::{Schema, SchemaRef},
-    record_batch::RecordBatch,
-};
-
 use crate::datasource::{
     empty::EmptyTable,
     file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
@@ -37,6 +27,16 @@ use crate::datasource::{
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::plan::ToStringifiedPlan;
 use crate::prelude::*;
+use crate::scalar::ScalarValue;
+use arrow::{
+    datatypes::{DataType, Schema, SchemaRef},
+    record_batch::RecordBatch,
+};
+use std::convert::TryFrom;
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+};
 
 use super::dfschema::ToDFSchema;
 use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, 
PlanType};
@@ -111,6 +111,80 @@ impl LogicalPlanBuilder {
         })
     }
 
+    /// Create a values list based relation, and the schema is inferred from 
data, consuming
+    /// `value`. See the [Postgres 
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+    /// documentation for more details.
+    ///
+    /// By default, it assigns the names column1, column2, etc. to the columns 
of a VALUES table.
+    /// The column names are not specified by the SQL standard and different 
database systems do it differently,
+    /// so it's usually better to override the default names with a table 
alias list.
+    pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
+        if values.is_empty() {
+            return Err(DataFusionError::Plan("Values list cannot be 
empty".into()));
+        }
+        let n_cols = values[0].len();
+        if n_cols == 0 {
+            return Err(DataFusionError::Plan(
+                "Values list cannot be zero length".into(),
+            ));
+        }
+        let empty_schema = DFSchema::empty();
+        let mut field_types: Vec<Option<DataType>> = 
Vec::with_capacity(n_cols);
+        for _ in 0..n_cols {
+            field_types.push(None);
+        }
+        // hold all the null holes so that we can correct their data types 
later
+        let mut nulls: Vec<(usize, usize)> = Vec::new();
+        for (i, row) in values.iter().enumerate() {
+            if row.len() != n_cols {
+                return Err(DataFusionError::Plan(format!(
+                    "Inconsistent data length across values list: got {} 
values in row {} but expected {}",
+                    row.len(),
+                    i,
+                    n_cols
+                )));
+            }
+            field_types = row
+                .iter()
+                .enumerate()
+                .map(|(j, expr)| {
+                    if let Expr::Literal(ScalarValue::Utf8(None)) = expr {
+                        nulls.push((i, j));
+                        Ok(field_types[j].clone())
+                    } else {
+                        let data_type = expr.get_type(&empty_schema)?;
+                        if let Some(prev_data_type) = &field_types[j] {
+                            if prev_data_type != &data_type {
+                                let err = format!("Inconsistent data type 
across values list at row {} column {}", i, j);
+                                return Err(DataFusionError::Plan(err));
+                            }
+                        }
+                        Ok(Some(data_type))
+                    }
+                })
+                .collect::<Result<Vec<Option<DataType>>>>()?;
+        }
+        let fields = field_types
+            .iter()
+            .enumerate()
+            .map(|(j, data_type)| {
+                // naming is following convention 
https://www.postgresql.org/docs/current/queries-values.html
+                let name = &format!("column{}", j + 1);
+                DFField::new(
+                    None,
+                    name,
+                    data_type.clone().unwrap_or(DataType::Utf8),
+                    true,
+                )
+            })
+            .collect::<Vec<_>>();
+        for (i, j) in nulls {
+            values[i][j] = 
Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
+        }
+        let schema = DFSchemaRef::new(DFSchema::new(fields)?);
+        Ok(Self::from(LogicalPlan::Values { schema, values }))
+    }
+
     /// Scan a memory data source
     pub fn scan_memory(
         partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index 7552fc6..13921d5 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -203,6 +203,15 @@ pub enum LogicalPlan {
         /// Whether the CSV file contains a header
         has_header: bool,
     },
+    /// Values expression. See
+    /// [Postgres 
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+    /// documentation for more details.
+    Values {
+        /// The table schema
+        schema: DFSchemaRef,
+        /// Values
+        values: Vec<Vec<Expr>>,
+    },
     /// Produces a relation with string representations of
     /// various parts of the plan
     Explain {
@@ -237,6 +246,7 @@ impl LogicalPlan {
     pub fn schema(&self) -> &DFSchemaRef {
         match self {
             LogicalPlan::EmptyRelation { schema, .. } => schema,
+            LogicalPlan::Values { schema, .. } => schema,
             LogicalPlan::TableScan {
                 projected_schema, ..
             } => projected_schema,
@@ -263,6 +273,7 @@ impl LogicalPlan {
             LogicalPlan::TableScan {
                 projected_schema, ..
             } => vec![projected_schema],
+            LogicalPlan::Values { schema, .. } => vec![schema],
             LogicalPlan::Window { input, schema, .. }
             | LogicalPlan::Aggregate { input, schema, .. }
             | LogicalPlan::Projection { input, schema, .. } => {
@@ -315,6 +326,9 @@ impl LogicalPlan {
     pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
         match self {
             LogicalPlan::Projection { expr, .. } => expr.clone(),
+            LogicalPlan::Values { values, .. } => {
+                values.iter().flatten().cloned().collect()
+            }
             LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
             LogicalPlan::Repartition {
                 partitioning_scheme,
@@ -369,6 +383,7 @@ impl LogicalPlan {
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
+            | LogicalPlan::Values { .. }
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
@@ -515,6 +530,7 @@ impl LogicalPlan {
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
+            | LogicalPlan::Values { .. }
             | LogicalPlan::CreateExternalTable { .. } => true,
         };
         if !recurse {
@@ -702,6 +718,25 @@ impl LogicalPlan {
             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
                 match &*self.0 {
                     LogicalPlan::EmptyRelation { .. } => write!(f, 
"EmptyRelation"),
+                    LogicalPlan::Values { ref values, .. } => {
+                        let str_values: Vec<_> = values
+                            .iter()
+                            // limit to only 5 values to avoid horrible display
+                            .take(5)
+                            .map(|row| {
+                                let item = row
+                                    .iter()
+                                    .map(|expr| expr.to_string())
+                                    .collect::<Vec<_>>()
+                                    .join(", ");
+                                format!("({})", item)
+                            })
+                            .collect();
+
+                        let elipse = if values.len() > 5 { "..." } else { "" };
+                        write!(f, "Values: {}{}", str_values.join(", "), 
elipse)
+                    }
+
                     LogicalPlan::TableScan {
                         ref table_name,
                         ref projection,
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs 
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 7192471..8d87b22 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
         | LogicalPlan::Repartition { .. }
         | LogicalPlan::Union { .. }
         | LogicalPlan::TableScan { .. }
+        | LogicalPlan::Values { .. }
         | LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::Limit { .. }
         | LogicalPlan::CreateExternalTable { .. }
diff --git a/datafusion/src/optimizer/constant_folding.rs 
b/datafusion/src/optimizer/constant_folding.rs
index 4d8f06f..d67d7d1 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Aggregate { .. }
             | LogicalPlan::Repartition { .. }
             | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::Values { .. }
             | LogicalPlan::Extension { .. }
             | LogicalPlan::Sort { .. }
             | LogicalPlan::Explain { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index 58fde1d..2d66c53 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -431,6 +431,7 @@ fn optimize_plan(
         | LogicalPlan::Filter { .. }
         | LogicalPlan::Repartition { .. }
         | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Values { .. }
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable { .. }
         | LogicalPlan::CrossJoin { .. }
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index 6e64bf3..b27de4b 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -124,6 +124,13 @@ pub fn from_plan(
             schema: schema.clone(),
             alias: alias.clone(),
         }),
+        LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values {
+            schema: schema.clone(),
+            values: expr
+                .chunks_exact(schema.fields().len())
+                .map(|s| s.to_vec())
+                .collect::<Vec<_>>(),
+        }),
         LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
             predicate: expr[0].clone(),
             input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 3accaad..ef53d86 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -645,5 +645,6 @@ pub mod udf;
 #[cfg(feature = "unicode_expressions")]
 pub mod unicode_expressions;
 pub mod union;
+pub mod values;
 pub mod window_functions;
 pub mod windows;
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index be8c588..8cfb907 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -20,7 +20,7 @@
 use super::analyze::AnalyzeExec;
 use super::{
     aggregates, empty::EmptyExec, expressions::binary, functions,
-    hash_join::PartitionMode, udaf, union::UnionExec, windows,
+    hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, 
windows,
 };
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::{
@@ -323,6 +323,30 @@ impl DefaultPhysicalPlanner {
                     let filters = unnormalize_cols(filters.iter().cloned());
                     source.scan(projection, batch_size, &filters, *limit).await
                 }
+                LogicalPlan::Values {
+                    values,
+                    schema,
+                } => {
+                    let exec_schema = schema.as_ref().to_owned().into();
+                    let exprs = values.iter()
+                        .map(|row| {
+                            row.iter().map(|expr|{
+                                self.create_physical_expr(
+                                    expr,
+                                    schema,
+                                    &exec_schema,
+                                    ctx_state,
+                                )
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    let value_exec = ValuesExec::try_new(
+                        SchemaRef::new(exec_schema),
+                        exprs
+                    )?;
+                    Ok(Arc::new(value_exec))
+                }
                 LogicalPlan::Window {
                     input, window_expr, ..
                 } => {
diff --git a/datafusion/src/physical_plan/values.rs 
b/datafusion/src/physical_plan/values.rs
new file mode 100644
index 0000000..6658f67
--- /dev/null
+++ b/datafusion/src/physical_plan/values.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Values execution plan
+
+use super::{common, SendableRecordBatchStream, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, 
ExecutionPlan,
+    Partitioning, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Execution plan for values list based relation (produces constant rows)
+#[derive(Debug)]
+pub struct ValuesExec {
+    /// The schema
+    schema: SchemaRef,
+    /// The data
+    data: Vec<RecordBatch>,
+}
+
+impl ValuesExec {
+    /// create a new values exec from data as expr
+    pub fn try_new(
+        schema: SchemaRef,
+        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    ) -> Result<Self> {
+        if data.is_empty() {
+            return Err(DataFusionError::Plan("Values list cannot be 
empty".into()));
+        }
+        // we have this empty batch as a placeholder to satisfy evaluation 
argument
+        let batch = RecordBatch::new_empty(schema.clone());
+        let n_row = data.len();
+        let n_col = schema.fields().len();
+        let arr = (0..n_col)
+            .map(|j| {
+                (0..n_row)
+                    .map(|i| {
+                        let r = data[i][j].evaluate(&batch);
+                        match r {
+                            Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
+                            Ok(ColumnarValue::Array(_)) => 
Err(DataFusionError::Plan(
+                                "Cannot have array values in a values 
list".into(),
+                            )),
+                            Err(err) => Err(err),
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()
+                    .and_then(ScalarValue::iter_to_array)
+            })
+            .collect::<Result<Vec<_>>>()?;
+        let batch = RecordBatch::try_new(schema.clone(), arr)?;
+        let data: Vec<RecordBatch> = vec![batch];
+        Ok(Self { schema, data })
+    }
+
+    /// provides the data
+    fn data(&self) -> Vec<RecordBatch> {
+        self.data.clone()
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for ValuesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            0 => Ok(Arc::new(ValuesExec {
+                schema: self.schema.clone(),
+                data: self.data.clone(),
+            })),
+            _ => Err(DataFusionError::Internal(
+                "ValuesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
+        // GlobalLimitExec has a single output partition
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "ValuesExec invalid partition {} (expected 0)",
+                partition
+            )));
+        }
+
+        Ok(Box::pin(MemoryStream::try_new(
+            self.data(),
+            self.schema.clone(),
+            None,
+        )?))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "ValuesExec")
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        let batch = self.data();
+        common::compute_record_batch_statistics(&[batch], &self.schema, None)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::test;
+
+    #[tokio::test]
+    async fn values_empty_case() -> Result<()> {
+        let schema = test::aggr_test_schema();
+        let empty = ValuesExec::try_new(schema, vec![]);
+        assert!(!empty.is_ok());
+        Ok(())
+    }
+}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 5c1b501..a16c40a 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -49,7 +49,7 @@ use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, 
FunctionArg,
     Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, 
SelectItem,
     SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
-    TrimWhereField, UnaryOperator, Value,
+    TrimWhereField, UnaryOperator, Value, Values as SQLValues,
 };
 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
 use sqlparser::ast::{OrderByExpr, Statement};
@@ -160,6 +160,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     ) -> Result<LogicalPlan> {
         match set_expr {
             SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes, alias),
+            SetExpr::Values(v) => self.sql_values_to_plan(v),
             SetExpr::SetOperation {
                 op,
                 left,
@@ -1068,6 +1069,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
+    fn sql_values_to_plan(&self, values: &SQLValues) -> Result<LogicalPlan> {
+        let values = values
+            .0
+            .iter()
+            .map(|row| {
+                row.iter()
+                    .map(|v| match v {
+                        SQLExpr::Value(Value::Number(n, _)) => match 
n.parse::<i64>() {
+                            Ok(n) => Ok(lit(n)),
+                            Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
+                        },
+                        SQLExpr::Value(Value::SingleQuotedString(ref s)) => {
+                            Ok(lit(s.clone()))
+                        }
+                        SQLExpr::Value(Value::Null) => {
+                            Ok(Expr::Literal(ScalarValue::Utf8(None)))
+                        }
+                        SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
+                        other => Err(DataFusionError::NotImplemented(format!(
+                            "Unsupported value {:?} in a values list 
expression",
+                            other
+                        ))),
+                    })
+                    .collect::<Result<Vec<_>>>()
+            })
+            .collect::<Result<Vec<_>>>()?;
+        LogicalPlanBuilder::values(values)?.build()
+    }
+
     fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> 
Result<Expr> {
         match sql {
             SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 67270c5..4d299ec 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -477,6 +477,180 @@ async fn csv_query_group_by_float32() -> Result<()> {
 }
 
 #[tokio::test]
+async fn select_values_list() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    {
+        let sql = "VALUES (1)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| 1       |",
+            "+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES ()";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1),(2)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| 1       |",
+            "| 2       |",
+            "+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (1),()";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1,'a'),(2,'b')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "| 1       | a       |",
+            "| 2       | b       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (1),(1,2)";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1),('2')";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1),(2.0)";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1,2), (1,'2')";
+        let plan = ctx.create_logical_plan(sql);
+        assert!(plan.is_err());
+    }
+    {
+        let sql = "VALUES (1,'a'),(NULL,'b'),(3,'c')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "| 1       | a       |",
+            "|         | b       |",
+            "| 3       | c       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (NULL,'a'),(NULL,'b'),(3,'c')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "|         | a       |",
+            "|         | b       |",
+            "| 3       | c       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (NULL,'a'),(NULL,'b'),(NULL,'c')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "|         | a       |",
+            "|         | b       |",
+            "|         | c       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (1,'a'),(2,NULL),(3,'c')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "| 1       | a       |",
+            "| 2       |         |",
+            "| 3       | c       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (1,NULL),(2,NULL),(3,'c')";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+",
+            "| column1 | column2 |",
+            "+---------+---------+",
+            "| 1       |         |",
+            "| 2       |         |",
+            "| 3       | c       |",
+            "+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+            "| column1 | column2 | column3 | column4 | column5 | column6 | 
column7 | column8 | column9 | column10 | column11 | column12 | column13 | 
column14 | column15 | column16 |",
+            
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+            "| 1       | 2       | 3       | 4       | 5       | 6       | 7   
    | 8       | 9       | 10       | 11       | 12       | 13       |          
| F        | 3.5      |",
+            
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "SELECT * FROM (VALUES (1,'a'),(2,NULL)) AS t(c1, c2)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----+----+",
+            "| c1 | c2 |",
+            "+----+----+",
+            "| 1  | a  |",
+            "| 2  |    |",
+            "+----+----+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    Ok(())
+}
+
+#[tokio::test]
 async fn select_all() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_simple_csv(&mut ctx).await?;
@@ -598,7 +772,7 @@ async fn select_distinct_simple_4() {
 async fn select_distinct_from() {
     let mut ctx = ExecutionContext::new();
 
-    let sql = "select 
+    let sql = "select
         1 IS DISTINCT FROM CAST(NULL as INT) as a,
         1 IS DISTINCT FROM 1 as b,
         1 IS NOT DISTINCT FROM CAST(NULL as INT) as c,
@@ -621,7 +795,7 @@ async fn select_distinct_from() {
 async fn select_distinct_from_utf8() {
     let mut ctx = ExecutionContext::new();
 
-    let sql = "select 
+    let sql = "select
         'x' IS DISTINCT FROM NULL as a,
         'x' IS DISTINCT FROM 'x' as b,
         'x' IS NOT DISTINCT FROM NULL as c,

Reply via email to