This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1b807 add values list expression (#1165)
3c1b807 is described below
commit 3c1b8079015e8171fa0db2476e1072084027256a
Author: Jiayu Liu <[email protected]>
AuthorDate: Sat Oct 23 21:10:53 2021 +0800
add values list expression (#1165)
* add values list expression
* apply formatting
---
ballista/rust/core/proto/ballista.proto | 18 ++-
.../rust/core/src/serde/logical_plan/from_proto.rs | 25 +++
.../rust/core/src/serde/logical_plan/to_proto.rs | 20 +++
datafusion/src/logical_plan/builder.rs | 94 +++++++++--
datafusion/src/logical_plan/plan.rs | 35 ++++
.../src/optimizer/common_subexpr_eliminate.rs | 1 +
datafusion/src/optimizer/constant_folding.rs | 1 +
datafusion/src/optimizer/projection_push_down.rs | 1 +
datafusion/src/optimizer/utils.rs | 7 +
datafusion/src/physical_plan/mod.rs | 1 +
datafusion/src/physical_plan/planner.rs | 26 ++-
datafusion/src/physical_plan/values.rs | 168 +++++++++++++++++++
datafusion/src/sql/planner.rs | 32 +++-
datafusion/tests/sql.rs | 178 ++++++++++++++++++++-
14 files changed, 588 insertions(+), 19 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 338c5a6..95b78fc 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -255,6 +255,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
+ ValuesNode values = 16;
}
}
@@ -316,12 +317,12 @@ message SelectionNode {
LogicalExprNode expr = 2;
}
-message SortNode{
+message SortNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
}
-message RepartitionNode{
+message RepartitionNode {
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
@@ -334,11 +335,11 @@ message HashRepartition {
uint64 partition_count = 2;
}
-message EmptyRelationNode{
+message EmptyRelationNode {
bool produce_one_row = 1;
}
-message CreateExternalTableNode{
+message CreateExternalTableNode {
string name = 1;
string location = 2;
FileType file_type = 3;
@@ -346,7 +347,14 @@ message CreateExternalTableNode{
DfSchema schema = 5;
}
-enum FileType{
+// a node containing data for defining values list. unlike in SQL where it's
two dimensional, here
+// the list is flattened, and with the field n_cols it can be parsed and
partitioned into rows
+message ValuesNode {
+ uint64 n_cols = 1;
+ repeated LogicalExprNode values_list = 2;
+}
+
+enum FileType {
NdJson = 0;
Parquet = 1;
CSV = 2;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 07eced7..26231c5 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -60,6 +60,31 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
))
})?;
match plan {
+ LogicalPlanType::Values(values) => {
+ let n_cols = values.n_cols as usize;
+ let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
+ Ok(Vec::new())
+ } else if values.values_list.len() % n_cols != 0 {
+ Err(BallistaError::General(format!(
+ "Invalid values list length, expect {} to be divisible
by {}",
+ values.values_list.len(),
+ n_cols
+ )))
+ } else {
+ values
+ .values_list
+ .chunks_exact(n_cols)
+ .map(|r| {
+ r.iter()
+ .map(|v| v.try_into())
+ .collect::<Result<Vec<_>, _>>()
+ })
+ .collect::<Result<Vec<_>, _>>()
+ }?;
+ LogicalPlanBuilder::values(values)?
+ .build()
+ .map_err(|e| e.into())
+ }
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
convert_box_required!(projection.input)?;
let x: Vec<Expr> = projection
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index e79e654..ae25d72 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -675,6 +675,26 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
use protobuf::logical_plan_node::LogicalPlanType;
match self {
+ LogicalPlan::Values { values, .. } => {
+ let n_cols = if values.is_empty() {
+ 0
+ } else {
+ values[0].len()
+ } as u64;
+ let values_list = values
+ .iter()
+ .flatten()
+ .map(|v| v.try_into())
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Values(
+ protobuf::ValuesNode {
+ n_cols,
+ values_list,
+ },
+ )),
+ })
+ }
LogicalPlan::TableScan {
table_name,
source,
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 3a1d127..3c6c444 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -17,16 +17,6 @@
//! This module provides a builder for creating LogicalPlans
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
-
-use arrow::{
- datatypes::{Schema, SchemaRef},
- record_batch::RecordBatch,
-};
-
use crate::datasource::{
empty::EmptyTable,
file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
@@ -37,6 +27,16 @@ use crate::datasource::{
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::ToStringifiedPlan;
use crate::prelude::*;
+use crate::scalar::ScalarValue;
+use arrow::{
+ datatypes::{DataType, Schema, SchemaRef},
+ record_batch::RecordBatch,
+};
+use std::convert::TryFrom;
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan,
PlanType};
@@ -111,6 +111,80 @@ impl LogicalPlanBuilder {
})
}
+ /// Create a values list based relation, and the schema is inferred from
data, consuming
+ /// `value`. See the [Postgres
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+ /// documentation for more details.
+ ///
+ /// By default, it assigns the names column1, column2, etc. to the columns
of a VALUES table.
+ /// The column names are not specified by the SQL standard and different
database systems do it differently,
+ /// so it's usually better to override the default names with a table
alias list.
+ pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
+ if values.is_empty() {
+ return Err(DataFusionError::Plan("Values list cannot be
empty".into()));
+ }
+ let n_cols = values[0].len();
+ if n_cols == 0 {
+ return Err(DataFusionError::Plan(
+ "Values list cannot be zero length".into(),
+ ));
+ }
+ let empty_schema = DFSchema::empty();
+ let mut field_types: Vec<Option<DataType>> =
Vec::with_capacity(n_cols);
+ for _ in 0..n_cols {
+ field_types.push(None);
+ }
+ // hold all the null holes so that we can correct their data types
later
+ let mut nulls: Vec<(usize, usize)> = Vec::new();
+ for (i, row) in values.iter().enumerate() {
+ if row.len() != n_cols {
+ return Err(DataFusionError::Plan(format!(
+ "Inconsistent data length across values list: got {}
values in row {} but expected {}",
+ row.len(),
+ i,
+ n_cols
+ )));
+ }
+ field_types = row
+ .iter()
+ .enumerate()
+ .map(|(j, expr)| {
+ if let Expr::Literal(ScalarValue::Utf8(None)) = expr {
+ nulls.push((i, j));
+ Ok(field_types[j].clone())
+ } else {
+ let data_type = expr.get_type(&empty_schema)?;
+ if let Some(prev_data_type) = &field_types[j] {
+ if prev_data_type != &data_type {
+ let err = format!("Inconsistent data type
across values list at row {} column {}", i, j);
+ return Err(DataFusionError::Plan(err));
+ }
+ }
+ Ok(Some(data_type))
+ }
+ })
+ .collect::<Result<Vec<Option<DataType>>>>()?;
+ }
+ let fields = field_types
+ .iter()
+ .enumerate()
+ .map(|(j, data_type)| {
+ // naming is following convention
https://www.postgresql.org/docs/current/queries-values.html
+ let name = &format!("column{}", j + 1);
+ DFField::new(
+ None,
+ name,
+ data_type.clone().unwrap_or(DataType::Utf8),
+ true,
+ )
+ })
+ .collect::<Vec<_>>();
+ for (i, j) in nulls {
+ values[i][j] =
Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
+ }
+ let schema = DFSchemaRef::new(DFSchema::new(fields)?);
+ Ok(Self::from(LogicalPlan::Values { schema, values }))
+ }
+
/// Scan a memory data source
pub fn scan_memory(
partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index 7552fc6..13921d5 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -203,6 +203,15 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
+ /// Values expression. See
+ /// [Postgres
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+ /// documentation for more details.
+ Values {
+ /// The table schema
+ schema: DFSchemaRef,
+ /// Values
+ values: Vec<Vec<Expr>>,
+ },
/// Produces a relation with string representations of
/// various parts of the plan
Explain {
@@ -237,6 +246,7 @@ impl LogicalPlan {
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::EmptyRelation { schema, .. } => schema,
+ LogicalPlan::Values { schema, .. } => schema,
LogicalPlan::TableScan {
projected_schema, ..
} => projected_schema,
@@ -263,6 +273,7 @@ impl LogicalPlan {
LogicalPlan::TableScan {
projected_schema, ..
} => vec![projected_schema],
+ LogicalPlan::Values { schema, .. } => vec![schema],
LogicalPlan::Window { input, schema, .. }
| LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
@@ -315,6 +326,9 @@ impl LogicalPlan {
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection { expr, .. } => expr.clone(),
+ LogicalPlan::Values { values, .. } => {
+ values.iter().flatten().cloned().collect()
+ }
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
LogicalPlan::Repartition {
partitioning_scheme,
@@ -369,6 +383,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
+ | LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => vec![],
}
}
@@ -515,6 +530,7 @@ impl LogicalPlan {
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
+ | LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => true,
};
if !recurse {
@@ -702,6 +718,25 @@ impl LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f,
"EmptyRelation"),
+ LogicalPlan::Values { ref values, .. } => {
+ let str_values: Vec<_> = values
+ .iter()
+ // limit to only 5 values to avoid horrible display
+ .take(5)
+ .map(|row| {
+ let item = row
+ .iter()
+ .map(|expr| expr.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ format!("({})", item)
+ })
+ .collect();
+
+ let elipse = if values.len() > 5 { "..." } else { "" };
+ write!(f, "Values: {}{}", str_values.join(", "),
elipse)
+ }
+
LogicalPlan::TableScan {
ref table_name,
ref projection,
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 7192471..8d87b22 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
| LogicalPlan::Repartition { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::TableScan { .. }
+ | LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index 4d8f06f..d67d7d1 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 58fde1d..2d66c53 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -431,6 +431,7 @@ fn optimize_plan(
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::EmptyRelation { .. }
+ | LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index 6e64bf3..b27de4b 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -124,6 +124,13 @@ pub fn from_plan(
schema: schema.clone(),
alias: alias.clone(),
}),
+ LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values {
+ schema: schema.clone(),
+ values: expr
+ .chunks_exact(schema.fields().len())
+ .map(|s| s.to_vec())
+ .collect::<Vec<_>>(),
+ }),
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 3accaad..ef53d86 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -645,5 +645,6 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
+pub mod values;
pub mod window_functions;
pub mod windows;
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index be8c588..8cfb907 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -20,7 +20,7 @@
use super::analyze::AnalyzeExec;
use super::{
aggregates, empty::EmptyExec, expressions::binary, functions,
- hash_join::PartitionMode, udaf, union::UnionExec, windows,
+ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec,
windows,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
@@ -323,6 +323,30 @@ impl DefaultPhysicalPlanner {
let filters = unnormalize_cols(filters.iter().cloned());
source.scan(projection, batch_size, &filters, *limit).await
}
+ LogicalPlan::Values {
+ values,
+ schema,
+ } => {
+ let exec_schema = schema.as_ref().to_owned().into();
+ let exprs = values.iter()
+ .map(|row| {
+ row.iter().map(|expr|{
+ self.create_physical_expr(
+ expr,
+ schema,
+ &exec_schema,
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let value_exec = ValuesExec::try_new(
+ SchemaRef::new(exec_schema),
+ exprs
+ )?;
+ Ok(Arc::new(value_exec))
+ }
LogicalPlan::Window {
input, window_expr, ..
} => {
diff --git a/datafusion/src/physical_plan/values.rs
b/datafusion/src/physical_plan/values.rs
new file mode 100644
index 0000000..6658f67
--- /dev/null
+++ b/datafusion/src/physical_plan/values.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Values execution plan
+
+use super::{common, SendableRecordBatchStream, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+ memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution,
ExecutionPlan,
+ Partitioning, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Execution plan for values list based relation (produces constant rows)
+#[derive(Debug)]
+pub struct ValuesExec {
+ /// The schema
+ schema: SchemaRef,
+ /// The data
+ data: Vec<RecordBatch>,
+}
+
+impl ValuesExec {
+ /// create a new values exec from data as expr
+ pub fn try_new(
+ schema: SchemaRef,
+ data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+ ) -> Result<Self> {
+ if data.is_empty() {
+ return Err(DataFusionError::Plan("Values list cannot be
empty".into()));
+ }
+ // we have this empty batch as a placeholder to satisfy evaluation
argument
+ let batch = RecordBatch::new_empty(schema.clone());
+ let n_row = data.len();
+ let n_col = schema.fields().len();
+ let arr = (0..n_col)
+ .map(|j| {
+ (0..n_row)
+ .map(|i| {
+ let r = data[i][j].evaluate(&batch);
+ match r {
+ Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
+ Ok(ColumnarValue::Array(_)) =>
Err(DataFusionError::Plan(
+ "Cannot have array values in a values
list".into(),
+ )),
+ Err(err) => Err(err),
+ }
+ })
+ .collect::<Result<Vec<_>>>()
+ .and_then(ScalarValue::iter_to_array)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let batch = RecordBatch::try_new(schema.clone(), arr)?;
+ let data: Vec<RecordBatch> = vec![batch];
+ Ok(Self { schema, data })
+ }
+
+ /// provides the data
+ fn data(&self) -> Vec<RecordBatch> {
+ self.data.clone()
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for ValuesExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn required_child_distribution(&self) -> Distribution {
+ Distribution::UnspecifiedDistribution
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn with_new_children(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ match children.len() {
+ 0 => Ok(Arc::new(ValuesExec {
+ schema: self.schema.clone(),
+ data: self.data.clone(),
+ })),
+ _ => Err(DataFusionError::Internal(
+ "ValuesExec wrong number of children".to_string(),
+ )),
+ }
+ }
+
+ async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
+ // GlobalLimitExec has a single output partition
+ if 0 != partition {
+ return Err(DataFusionError::Internal(format!(
+ "ValuesExec invalid partition {} (expected 0)",
+ partition
+ )));
+ }
+
+ Ok(Box::pin(MemoryStream::try_new(
+ self.data(),
+ self.schema.clone(),
+ None,
+ )?))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "ValuesExec")
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ let batch = self.data();
+ common::compute_record_batch_statistics(&[batch], &self.schema, None)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test;
+
+ #[tokio::test]
+ async fn values_empty_case() -> Result<()> {
+ let schema = test::aggr_test_schema();
+ let empty = ValuesExec::try_new(schema, vec![]);
+ assert!(!empty.is_ok());
+ Ok(())
+ }
+}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 5c1b501..a16c40a 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -49,7 +49,7 @@ use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr,
FunctionArg,
Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select,
SelectItem,
SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
- TrimWhereField, UnaryOperator, Value,
+ TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
@@ -160,6 +160,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes, alias),
+ SetExpr::Values(v) => self.sql_values_to_plan(v),
SetExpr::SetOperation {
op,
left,
@@ -1068,6 +1069,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
+ fn sql_values_to_plan(&self, values: &SQLValues) -> Result<LogicalPlan> {
+ let values = values
+ .0
+ .iter()
+ .map(|row| {
+ row.iter()
+ .map(|v| match v {
+ SQLExpr::Value(Value::Number(n, _)) => match
n.parse::<i64>() {
+ Ok(n) => Ok(lit(n)),
+ Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
+ },
+ SQLExpr::Value(Value::SingleQuotedString(ref s)) => {
+ Ok(lit(s.clone()))
+ }
+ SQLExpr::Value(Value::Null) => {
+ Ok(Expr::Literal(ScalarValue::Utf8(None)))
+ }
+ SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
+ other => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported value {:?} in a values list
expression",
+ other
+ ))),
+ })
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+ LogicalPlanBuilder::values(values)?.build()
+ }
+
fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) ->
Result<Expr> {
match sql {
SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 67270c5..4d299ec 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -477,6 +477,180 @@ async fn csv_query_group_by_float32() -> Result<()> {
}
#[tokio::test]
+async fn select_values_list() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ {
+ let sql = "VALUES (1)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES ()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),(2)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "| 2 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),()";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,'a'),(2,'b')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| 2 | b |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1),(1,2)";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),('2')";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1),(2.0)";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,2), (1,'2')";
+ let plan = ctx.create_logical_plan(sql);
+ assert!(plan.is_err());
+ }
+ {
+ let sql = "VALUES (1,'a'),(NULL,'b'),(3,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| | b |",
+ "| 3 | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (NULL,'a'),(NULL,'b'),(3,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| | a |",
+ "| | b |",
+ "| 3 | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (NULL,'a'),(NULL,'b'),(NULL,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| | a |",
+ "| | b |",
+ "| | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1,'a'),(2,NULL),(3,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | a |",
+ "| 2 | |",
+ "| 3 | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1,NULL),(2,NULL),(3,'c')";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 1 | |",
+ "| 2 | |",
+ "| 3 | c |",
+ "+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+ "| column1 | column2 | column3 | column4 | column5 | column6 |
column7 | column8 | column9 | column10 | column11 | column12 | column13 |
column14 | column15 | column16 |",
+
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+ "| 1 | 2 | 3 | 4 | 5 | 6 | 7
| 8 | 9 | 10 | 11 | 12 | 13 |
| F | 3.5 |",
+
"+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "SELECT * FROM (VALUES (1,'a'),(2,NULL)) AS t(c1, c2)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 1 | a |",
+ "| 2 | |",
+ "+----+----+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ Ok(())
+}
+
+#[tokio::test]
async fn select_all() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;
@@ -598,7 +772,7 @@ async fn select_distinct_simple_4() {
async fn select_distinct_from() {
let mut ctx = ExecutionContext::new();
- let sql = "select
+ let sql = "select
1 IS DISTINCT FROM CAST(NULL as INT) as a,
1 IS DISTINCT FROM 1 as b,
1 IS NOT DISTINCT FROM CAST(NULL as INT) as c,
@@ -621,7 +795,7 @@ async fn select_distinct_from() {
async fn select_distinct_from_utf8() {
let mut ctx = ExecutionContext::new();
- let sql = "select
+ let sql = "select
'x' IS DISTINCT FROM NULL as a,
'x' IS DISTINCT FROM 'x' as b,
'x' IS NOT DISTINCT FROM NULL as c,