This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fa8a0d9fd6 feat: customize column default values for external tables 
(#8415)
fa8a0d9fd6 is described below

commit fa8a0d9fd609efd24e866a191339075f169f90bd
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Dec 7 05:55:13 2023 +0800

    feat: customize column default values for external tables (#8415)
    
    * feat: customize column default values for external tables
    
    * fix test
    
    * tests from reviewing
---
 datafusion/core/src/catalog/listing_schema.rs      |  1 +
 datafusion/core/src/datasource/listing/table.rs    | 16 ++++++
 .../core/src/datasource/listing_table_factory.rs   |  4 +-
 datafusion/core/src/datasource/memory.rs           |  2 +-
 datafusion/expr/src/logical_plan/ddl.rs            |  2 +
 datafusion/proto/proto/datafusion.proto            |  1 +
 datafusion/proto/src/generated/pbjson.rs           | 20 +++++++
 datafusion/proto/src/generated/prost.rs            |  5 ++
 datafusion/proto/src/logical_plan/mod.rs           | 17 ++++++
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 11 ++--
 datafusion/sql/src/statement.rs                    | 10 +++-
 datafusion/sqllogictest/test_files/insert.slt      | 21 +++++++
 .../sqllogictest/test_files/insert_to_external.slt | 67 ++++++++++++++++++++++
 13 files changed, 169 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/catalog/listing_schema.rs 
b/datafusion/core/src/catalog/listing_schema.rs
index 0d5c49f377..c3c6826895 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -149,6 +149,7 @@ impl ListingSchemaProvider {
                             unbounded: false,
                             options: Default::default(),
                             constraints: Constraints::empty(),
+                            column_defaults: Default::default(),
                         },
                     )
                     .await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index a3be57db3a..effeacc480 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,6 +17,7 @@
 
 //! The table implementation.
 
+use std::collections::HashMap;
 use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
@@ -558,6 +559,7 @@ pub struct ListingTable {
     collected_statistics: FileStatisticsCache,
     infinite_source: bool,
     constraints: Constraints,
+    column_defaults: HashMap<String, Expr>,
 }
 
 impl ListingTable {
@@ -596,6 +598,7 @@ impl ListingTable {
             collected_statistics: 
Arc::new(DefaultFileStatisticsCache::default()),
             infinite_source,
             constraints: Constraints::empty(),
+            column_defaults: HashMap::new(),
         };
 
         Ok(table)
@@ -607,6 +610,15 @@ impl ListingTable {
         self
     }
 
+    /// Assign column defaults
+    pub fn with_column_defaults(
+        mut self,
+        column_defaults: HashMap<String, Expr>,
+    ) -> Self {
+        self.column_defaults = column_defaults;
+        self
+    }
+
     /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
     ///
     /// Setting a statistics cache on the `SessionContext` can avoid 
refetching statistics
@@ -844,6 +856,10 @@ impl TableProvider for ListingTable {
             .create_writer_physical_plan(input, state, config, 
order_requirements)
             .await
     }
+
+    fn get_column_default(&self, column: &str) -> Option<&Expr> {
+        self.column_defaults.get(column)
+    }
 }
 
 impl ListingTable {
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index f70a820351..96436306c6 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -228,7 +228,8 @@ impl TableProviderFactory for ListingTableFactory {
             
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
         let table = provider
             .with_definition(cmd.definition.clone())
-            .with_constraints(cmd.constraints.clone());
+            .with_constraints(cmd.constraints.clone())
+            .with_column_defaults(cmd.column_defaults.clone());
         Ok(Arc::new(table))
     }
 }
@@ -279,6 +280,7 @@ mod tests {
             unbounded: false,
             options: HashMap::new(),
             constraints: Constraints::empty(),
+            column_defaults: HashMap::new(),
         };
         let table_provider = factory.create(&state, &cmd).await.unwrap();
         let listing_table = table_provider
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index a841518d9c..7c044b2936 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,9 +19,9 @@
 
 use datafusion_physical_plan::metrics::MetricsSet;
 use futures::StreamExt;
-use hashbrown::HashMap;
 use log::debug;
 use std::any::Any;
+use std::collections::HashMap;
 use std::fmt::{self, Debug};
 use std::sync::Arc;
 
diff --git a/datafusion/expr/src/logical_plan/ddl.rs 
b/datafusion/expr/src/logical_plan/ddl.rs
index 97551a941a..e74992d993 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -194,6 +194,8 @@ pub struct CreateExternalTable {
     pub options: HashMap<String, String>,
     /// The list of constraints in the schema, such as primary key, unique, 
etc.
     pub constraints: Constraints,
+    /// Default values for columns
+    pub column_defaults: HashMap<String, Expr>,
 }
 
 // Hashing refers to a subset of fields considered in PartialEq.
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index e46e70a139..64b8e28074 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -216,6 +216,7 @@ message CreateExternalTableNode {
   bool unbounded = 14;
   map<string, string> options = 11;
   Constraints constraints = 15;
+  map<string, LogicalExprNode> column_defaults = 16;
 }
 
 message PrepareNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index a1c1775419..34ad63d819 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4026,6 +4026,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if self.constraints.is_some() {
             len += 1;
         }
+        if !self.column_defaults.is_empty() {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CreateExternalTableNode", len)?;
         if let Some(v) = self.name.as_ref() {
             struct_ser.serialize_field("name", v)?;
@@ -4069,6 +4072,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if let Some(v) = self.constraints.as_ref() {
             struct_ser.serialize_field("constraints", v)?;
         }
+        if !self.column_defaults.is_empty() {
+            struct_ser.serialize_field("columnDefaults", 
&self.column_defaults)?;
+        }
         struct_ser.end()
     }
 }
@@ -4099,6 +4105,8 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
             "unbounded",
             "options",
             "constraints",
+            "column_defaults",
+            "columnDefaults",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -4117,6 +4125,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
             Unbounded,
             Options,
             Constraints,
+            ColumnDefaults,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -4152,6 +4161,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                             "unbounded" => Ok(GeneratedField::Unbounded),
                             "options" => Ok(GeneratedField::Options),
                             "constraints" => Ok(GeneratedField::Constraints),
+                            "columnDefaults" | "column_defaults" => 
Ok(GeneratedField::ColumnDefaults),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -4185,6 +4195,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                 let mut unbounded__ = None;
                 let mut options__ = None;
                 let mut constraints__ = None;
+                let mut column_defaults__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Name => {
@@ -4273,6 +4284,14 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                             }
                             constraints__ = map_.next_value()?;
                         }
+                        GeneratedField::ColumnDefaults => {
+                            if column_defaults__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("columnDefaults"));
+                            }
+                            column_defaults__ = Some(
+                                map_.next_value::<std::collections::HashMap<_, 
_>>()?
+                            );
+                        }
                     }
                 }
                 Ok(CreateExternalTableNode {
@@ -4290,6 +4309,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                     unbounded: unbounded__.unwrap_or_default(),
                     options: options__.unwrap_or_default(),
                     constraints: constraints__,
+                    column_defaults: column_defaults__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index b9fb616b31..8b4dd1b759 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -360,6 +360,11 @@ pub struct CreateExternalTableNode {
     >,
     #[prost(message, optional, tag = "15")]
     pub constraints: ::core::option::Option<Constraints>,
+    #[prost(map = "string, message", tag = "16")]
+    pub column_defaults: ::std::collections::HashMap<
+        ::prost::alloc::string::String,
+        LogicalExprNode,
+    >,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 851f062bd5..50bca0295d 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -521,6 +522,13 @@ impl AsLogicalPlan for LogicalPlanNode {
                     order_exprs.push(order_expr)
                 }
 
+                let mut column_defaults =
+                    
HashMap::with_capacity(create_extern_table.column_defaults.len());
+                for (col_name, expr) in &create_extern_table.column_defaults {
+                    let expr = from_proto::parse_expr(expr, ctx)?;
+                    column_defaults.insert(col_name.clone(), expr);
+                }
+
                 
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
                     schema: pb_schema.try_into()?,
                     name: 
from_owned_table_reference(create_extern_table.name.as_ref(), 
"CreateExternalTable")?,
@@ -540,6 +548,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     unbounded: create_extern_table.unbounded,
                     options: create_extern_table.options.clone(),
                     constraints: constraints.into(),
+                    column_defaults,
                 })))
             }
             LogicalPlanType::CreateView(create_view) => {
@@ -1298,6 +1307,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     unbounded,
                     options,
                     constraints,
+                    column_defaults,
                 },
             )) => {
                 let mut converted_order_exprs: Vec<LogicalExprNodeCollection> 
= vec![];
@@ -1312,6 +1322,12 @@ impl AsLogicalPlan for LogicalPlanNode {
                     converted_order_exprs.push(temp);
                 }
 
+                let mut converted_column_defaults =
+                    HashMap::with_capacity(column_defaults.len());
+                for (col_name, expr) in column_defaults {
+                    converted_column_defaults.insert(col_name.clone(), 
expr.try_into()?);
+                }
+
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: 
Some(LogicalPlanType::CreateExternalTable(
                         protobuf::CreateExternalTableNode {
@@ -1329,6 +1345,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             unbounded: *unbounded,
                             options: options.clone(),
                             constraints: Some(constraints.clone().into()),
+                            column_defaults: converted_column_defaults,
                         },
                     )),
                 })
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index e04a7a9c9d..5e36a838f3 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -217,11 +217,10 @@ async fn roundtrip_custom_memory_tables() -> Result<()> {
 async fn roundtrip_custom_listing_tables() -> Result<()> {
     let ctx = SessionContext::new();
 
-    // Make sure during round-trip, constraint information is preserved
     let query = "CREATE EXTERNAL TABLE multiple_ordered_table_with_pk (
               a0 INTEGER,
-              a INTEGER,
-              b INTEGER,
+              a INTEGER DEFAULT 1*2 + 3,
+              b INTEGER DEFAULT NULL,
               c INTEGER,
               d INTEGER,
               primary key(c)
@@ -232,11 +231,13 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
             WITH ORDER (c ASC)
             LOCATION '../core/tests/data/window_2.csv';";
 
-    let plan = ctx.sql(query).await?.into_optimized_plan()?;
+    let plan = ctx.state().create_logical_plan(query).await?;
 
     let bytes = logical_plan_to_bytes(&plan)?;
     let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
-    assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+    // Use exact matching to verify everything. Make sure during round-trip,
+    // information like constraints, column defaults, and other aspects of the 
plan are preserved.
+    assert_eq!(plan, logical_round_trip);
 
     Ok(())
 }
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 4220e83316..12083554f0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -762,11 +762,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             )?;
         }
 
+        let mut planner_context = PlannerContext::new();
+
+        let column_defaults = self
+            .build_column_defaults(&columns, &mut planner_context)?
+            .into_iter()
+            .collect();
+
         let schema = self.build_schema(columns)?;
         let df_schema = schema.to_dfschema_ref()?;
 
         let ordered_exprs =
-            self.build_order_by(order_exprs, &df_schema, &mut 
PlannerContext::new())?;
+            self.build_order_by(order_exprs, &df_schema, &mut 
planner_context)?;
 
         // External tables do not support schemas at the moment, so the name 
is just a table name
         let name = OwnedTableReference::bare(name);
@@ -788,6 +795,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 unbounded,
                 options,
                 constraints,
+                column_defaults,
             },
         )))
     }
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index 75252b3b7c..e20b377945 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -382,6 +382,27 @@ select a,b,c,d from test_column_defaults
 1 10 100 ABC
 NULL 20 500 default_text
 
+# fill the timestamp column with default value `now()` again, it should be 
different from the previous one
+query IIITP
+insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF')
+----
+1
+
+# Ensure that the default expression `now()` is evaluated during insertion, 
not optimized away.
+# Rows are inserted during different time, so their timestamp values should be 
different.
+query I rowsort
+select count(distinct e) from test_column_defaults
+----
+3
+
+# Expect all rows to be true as now() was inserted into the table
+query B rowsort
+select e < now() from test_column_defaults
+----
+true
+true
+true
+
 statement ok
 drop table test_column_defaults
 
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 39323479ff..85c2db7faa 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -543,3 +543,70 @@ select * from table_without_values;
 
 statement ok
 drop table table_without_values;
+
+
+### Test for specifying column's default value
+
+statement ok
+CREATE EXTERNAL TABLE test_column_defaults(
+  a int,
+  b int not null default null,
+  c int default 100*2+300,
+  d text default lower('DEFAULT_TEXT'),
+  e timestamp default now()
+) STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6'
+OPTIONS (create_local_path 'true');
+
+# fill in all column values
+query IIITP
+insert into test_column_defaults values(1, 10, 100, 'ABC', now())
+----
+1
+
+statement error DataFusion error: Execution error: Invalid batch column at '1' 
has null but schema specifies non-nullable
+insert into test_column_defaults(a) values(2)
+
+query IIITP
+insert into test_column_defaults(b) values(20)
+----
+1
+
+query IIIT rowsort
+select a,b,c,d from test_column_defaults
+----
+1 10 100 ABC
+NULL 20 500 default_text
+
+# fill the timestamp column with default value `now()` again, it should be 
different from the previous one
+query IIITP
+insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF')
+----
+1
+
+# Ensure that the default expression `now()` is evaluated during insertion, 
not optimized away.
+# Rows are inserted during different time, so their timestamp values should be 
different.
+query I rowsort
+select count(distinct e) from test_column_defaults
+----
+3
+
+# Expect all rows to be true as now() was inserted into the table
+query B rowsort
+select e < now() from test_column_defaults
+----
+true
+true
+true
+
+statement ok
+drop table test_column_defaults
+
+# test invalid default value
+statement error DataFusion error: Error during planning: Column reference is 
not allowed in the DEFAULT expression : Schema error: No field named a.
+CREATE EXTERNAL TABLE test_column_defaults(
+  a int,
+  b int default a+1
+) STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7'
+OPTIONS (create_local_path 'true');

Reply via email to