This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dbda1e2cf feat: support customizing column default values for 
inserting (#8283)
3dbda1e2cf is described below

commit 3dbda1e2cfdbfb974c268887294e6cf3de350f71
Author: Jonah Gao <[email protected]>
AuthorDate: Wed Nov 22 19:49:00 2023 +0800

    feat: support customizing column default values for inserting (#8283)
    
    * parse column default values
    
    * fix clippy
    
    * Impl for memroy table
    
    * Add tests
    
    * Add test
    
    * Use plan_datafusion_err
    
    * Add comment
    
    * Update datafusion/sql/src/planner.rs
    
    Co-authored-by: comphead <[email protected]>
    
    * Fix ci
    
    ---------
    
    Co-authored-by: comphead <[email protected]>
---
 .../core/src/datasource/default_table_source.rs    |  4 ++
 datafusion/core/src/datasource/memory.rs           | 16 ++++++
 datafusion/core/src/datasource/provider.rs         |  5 ++
 datafusion/core/src/execution/context/mod.rs       | 14 +++--
 datafusion/expr/src/logical_plan/ddl.rs            |  2 +
 datafusion/expr/src/logical_plan/plan.rs           |  2 +
 datafusion/expr/src/table_source.rs                |  5 ++
 datafusion/sql/src/planner.rs                      | 41 +++++++++++++-
 datafusion/sql/src/query.rs                        |  1 +
 datafusion/sql/src/statement.rs                    | 15 +++++-
 datafusion/sqllogictest/test_files/insert.slt      | 62 ++++++++++++++++++++++
 11 files changed, 160 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/datasource/default_table_source.rs 
b/datafusion/core/src/datasource/default_table_source.rs
index 00a9c123ce..fadf01c74c 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -73,6 +73,10 @@ impl TableSource for DefaultTableSource {
     fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
         self.table_provider.get_logical_plan()
     }
+
+    fn get_column_default(&self, column: &str) -> Option<&Expr> {
+        self.table_provider.get_column_default(column)
+    }
 }
 
 /// Wrap TableProvider in TableSource
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 6bcaa97a40..a841518d9c 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,6 +19,7 @@
 
 use datafusion_physical_plan::metrics::MetricsSet;
 use futures::StreamExt;
+use hashbrown::HashMap;
 use log::debug;
 use std::any::Any;
 use std::fmt::{self, Debug};
@@ -56,6 +57,7 @@ pub struct MemTable {
     schema: SchemaRef,
     pub(crate) batches: Vec<PartitionData>,
     constraints: Constraints,
+    column_defaults: HashMap<String, Expr>,
 }
 
 impl MemTable {
@@ -79,6 +81,7 @@ impl MemTable {
                 .map(|e| Arc::new(RwLock::new(e)))
                 .collect::<Vec<_>>(),
             constraints: Constraints::empty(),
+            column_defaults: HashMap::new(),
         })
     }
 
@@ -88,6 +91,15 @@ impl MemTable {
         self
     }
 
+    /// Assign column defaults
+    pub fn with_column_defaults(
+        mut self,
+        column_defaults: HashMap<String, Expr>,
+    ) -> Self {
+        self.column_defaults = column_defaults;
+        self
+    }
+
     /// Create a mem table by reading from another data source
     pub async fn load(
         t: Arc<dyn TableProvider>,
@@ -228,6 +240,10 @@ impl TableProvider for MemTable {
             None,
         )))
     }
+
+    fn get_column_default(&self, column: &str) -> Option<&Expr> {
+        self.column_defaults.get(column)
+    }
 }
 
 /// Implements for writing to a [`MemTable`]
diff --git a/datafusion/core/src/datasource/provider.rs 
b/datafusion/core/src/datasource/provider.rs
index 4fe433044e..275523405a 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -66,6 +66,11 @@ pub trait TableProvider: Sync + Send {
         None
     }
 
+    /// Get the default value for a column, if available.
+    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
+        None
+    }
+
     /// Create an [`ExecutionPlan`] for scanning the table with optionally
     /// specified `projection`, `filter` and `limit`, described below.
     ///
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index f829092570..46388f990a 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -529,6 +529,7 @@ impl SessionContext {
             if_not_exists,
             or_replace,
             constraints,
+            column_defaults,
         } = cmd;
 
         let input = Arc::try_unwrap(input).unwrap_or_else(|e| 
e.as_ref().clone());
@@ -542,7 +543,12 @@ impl SessionContext {
                 let physical = DataFrame::new(self.state(), input);
 
                 let batches: Vec<_> = physical.collect_partitioned().await?;
-                let table = Arc::new(MemTable::try_new(schema, batches)?);
+                let table = Arc::new(
+                    // pass constraints and column defaults to the mem table.
+                    MemTable::try_new(schema, batches)?
+                        .with_constraints(constraints)
+                        
.with_column_defaults(column_defaults.into_iter().collect()),
+                );
 
                 self.register_table(&name, table)?;
                 self.return_empty_dataframe()
@@ -557,8 +563,10 @@ impl SessionContext {
 
                 let batches: Vec<_> = physical.collect_partitioned().await?;
                 let table = Arc::new(
-                    // pass constraints to the mem table.
-                    MemTable::try_new(schema, 
batches)?.with_constraints(constraints),
+                    // pass constraints and column defaults to the mem table.
+                    MemTable::try_new(schema, batches)?
+                        .with_constraints(constraints)
+                        
.with_column_defaults(column_defaults.into_iter().collect()),
                 );
 
                 self.register_table(&name, table)?;
diff --git a/datafusion/expr/src/logical_plan/ddl.rs 
b/datafusion/expr/src/logical_plan/ddl.rs
index 2c90a3aca7..97551a941a 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -228,6 +228,8 @@ pub struct CreateMemoryTable {
     pub if_not_exists: bool,
     /// Option to replace table content if table already exists
     pub or_replace: bool,
+    /// Default values for columns
+    pub column_defaults: Vec<(String, Expr)>,
 }
 
 /// Creates a view.
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index a024824c7a..69ba42d34a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -811,6 +811,7 @@ impl LogicalPlan {
                 name,
                 if_not_exists,
                 or_replace,
+                column_defaults,
                 ..
             })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
                 CreateMemoryTable {
@@ -819,6 +820,7 @@ impl LogicalPlan {
                     name: name.clone(),
                     if_not_exists: *if_not_exists,
                     or_replace: *or_replace,
+                    column_defaults: column_defaults.clone(),
                 },
             ))),
             LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
diff --git a/datafusion/expr/src/table_source.rs 
b/datafusion/expr/src/table_source.rs
index 94f26d9158..565f48c1c5 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -103,4 +103,9 @@ pub trait TableSource: Sync + Send {
     fn get_logical_plan(&self) -> Option<&LogicalPlan> {
         None
     }
+
+    /// Get the default value for a column, if available.
+    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
+        None
+    }
 }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index ca5e260aee..622e5aca79 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -21,8 +21,9 @@ use std::sync::Arc;
 use std::vec;
 
 use arrow_schema::*;
-use datafusion_common::field_not_found;
-use datafusion_common::internal_err;
+use datafusion_common::{
+    field_not_found, internal_err, plan_datafusion_err, SchemaError,
+};
 use datafusion_expr::WindowUDF;
 use sqlparser::ast::TimezoneInfo;
 use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
@@ -230,6 +231,42 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         Ok(Schema::new(fields))
     }
 
+    /// Returns a vector of (column_name, default_expr) pairs
+    pub(super) fn build_column_defaults(
+        &self,
+        columns: &Vec<SQLColumnDef>,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Vec<(String, Expr)>> {
+        let mut column_defaults = vec![];
+        // Default expressions are restricted, column references are not 
allowed
+        let empty_schema = DFSchema::empty();
+        let error_desc = |e: DataFusionError| match e {
+            DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }) => 
{
+                plan_datafusion_err!(
+                    "Column reference is not allowed in the DEFAULT expression 
: {}",
+                    e
+                )
+            }
+            _ => e,
+        };
+
+        for column in columns {
+            if let Some(default_sql_expr) =
+                column.options.iter().find_map(|o| match &o.option {
+                    ColumnOption::Default(expr) => Some(expr),
+                    _ => None,
+                })
+            {
+                let default_expr = self
+                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, 
planner_context)
+                    .map_err(error_desc)?;
+                column_defaults
+                    .push((self.normalizer.normalize(column.name.clone()), 
default_expr));
+            }
+        }
+        Ok(column_defaults)
+    }
+
     /// Apply the given TableAlias to the input plan
     pub(crate) fn apply_table_alias(
         &self,
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 832e2da9c6..643f41d844 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -90,6 +90,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     input: Arc::new(plan),
                     if_not_exists: false,
                     or_replace: false,
+                    column_defaults: vec![],
                 }))
             }
             _ => plan,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 49755729d2..aa2f0583cb 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -204,6 +204,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let mut all_constraints = constraints;
                 let inline_constraints = 
calc_inline_constraints_from_columns(&columns);
                 all_constraints.extend(inline_constraints);
+                // Build column default values
+                let column_defaults =
+                    self.build_column_defaults(&columns, planner_context)?;
                 match query {
                     Some(query) => {
                         let plan = self.query_to_plan(*query, 
planner_context)?;
@@ -250,6 +253,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                                 input: Arc::new(plan),
                                 if_not_exists,
                                 or_replace,
+                                column_defaults,
                             },
                         )))
                     }
@@ -272,6 +276,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                                 input: Arc::new(plan),
                                 if_not_exists,
                                 or_replace,
+                                column_defaults,
                             },
                         )))
                     }
@@ -1170,8 +1175,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         
datafusion_expr::Expr::Column(source_field.qualified_column())
                             .cast_to(target_field.data_type(), 
source.schema())?
                     }
-                    // Fill the default value for the column, currently only 
supports NULL.
-                    None => datafusion_expr::Expr::Literal(ScalarValue::Null)
+                    // The value is not specified. Fill in the default value 
for the column.
+                    None => table_source
+                        .get_column_default(target_field.name())
+                        .cloned()
+                        .unwrap_or_else(|| {
+                            // If there is no default for the column, then the 
default is NULL
+                            datafusion_expr::Expr::Literal(ScalarValue::Null)
+                        })
                         .cast_to(target_field.data_type(), 
&DFSchema::empty())?,
                 };
                 Ok(expr.alias(target_field.name()))
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index aacd227cdb..9734aab9ab 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -350,3 +350,65 @@ insert into bad_new_empty_table values (1);
 
 statement ok
 drop table bad_new_empty_table;
+
+
+### Test for specifying column's default value
+
+statement ok
+create table test_column_defaults(
+  a int,
+  b int not null default null,
+  c int default 100*2+300, 
+  d text default lower('DEFAULT_TEXT'),
+  e timestamp default now()
+)
+
+query IIITP
+insert into test_column_defaults values(1, 10, 100, 'ABC', now())
+----
+1
+
+statement error DataFusion error: Execution error: Invalid batch column at '1' 
has null but schema specifies non-nullable
+insert into test_column_defaults(a) values(2)
+
+query IIITP
+insert into test_column_defaults(b) values(20)
+----
+1
+
+query IIIT rowsort
+select a,b,c,d from test_column_defaults
+----
+1 10 100 ABC
+NULL 20 500 default_text
+
+statement ok
+drop table test_column_defaults
+
+
+# test create table as 
+statement ok
+create table test_column_defaults(
+  a int,
+  b int not null default null,
+  c int default 100*2+300, 
+  d text default lower('DEFAULT_TEXT'),
+  e timestamp default now()
+) as values(1, 10, 100, 'ABC', now())
+
+query IIITP
+insert into test_column_defaults(b) values(20)
+----
+1
+
+query IIIT rowsort
+select a,b,c,d from test_column_defaults
+----
+1 10 100 ABC
+NULL 20 500 default_text
+
+statement ok
+drop table test_column_defaults
+
+statement error DataFusion error: Error during planning: Column reference is 
not allowed in the DEFAULT expression : Schema error: No field named a.
+create table test_column_defaults(a int, b int default a+1)

Reply via email to