This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fa8a0d9fd6 feat: customize column default values for external tables
(#8415)
fa8a0d9fd6 is described below
commit fa8a0d9fd609efd24e866a191339075f169f90bd
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Dec 7 05:55:13 2023 +0800
feat: customize column default values for external tables (#8415)
* feat: customize column default values for external tables
* fix test
* tests from reviewing
---
datafusion/core/src/catalog/listing_schema.rs | 1 +
datafusion/core/src/datasource/listing/table.rs | 16 ++++++
.../core/src/datasource/listing_table_factory.rs | 4 +-
datafusion/core/src/datasource/memory.rs | 2 +-
datafusion/expr/src/logical_plan/ddl.rs | 2 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 20 +++++++
datafusion/proto/src/generated/prost.rs | 5 ++
datafusion/proto/src/logical_plan/mod.rs | 17 ++++++
.../proto/tests/cases/roundtrip_logical_plan.rs | 11 ++--
datafusion/sql/src/statement.rs | 10 +++-
datafusion/sqllogictest/test_files/insert.slt | 21 +++++++
.../sqllogictest/test_files/insert_to_external.slt | 67 ++++++++++++++++++++++
13 files changed, 169 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/catalog/listing_schema.rs
b/datafusion/core/src/catalog/listing_schema.rs
index 0d5c49f377..c3c6826895 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -149,6 +149,7 @@ impl ListingSchemaProvider {
unbounded: false,
options: Default::default(),
constraints: Constraints::empty(),
+ column_defaults: Default::default(),
},
)
.await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index a3be57db3a..effeacc480 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,6 +17,7 @@
//! The table implementation.
+use std::collections::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};
@@ -558,6 +559,7 @@ pub struct ListingTable {
collected_statistics: FileStatisticsCache,
infinite_source: bool,
constraints: Constraints,
+ column_defaults: HashMap<String, Expr>,
}
impl ListingTable {
@@ -596,6 +598,7 @@ impl ListingTable {
collected_statistics:
Arc::new(DefaultFileStatisticsCache::default()),
infinite_source,
constraints: Constraints::empty(),
+ column_defaults: HashMap::new(),
};
Ok(table)
@@ -607,6 +610,15 @@ impl ListingTable {
self
}
+ /// Assign column defaults
+ pub fn with_column_defaults(
+ mut self,
+ column_defaults: HashMap<String, Expr>,
+ ) -> Self {
+ self.column_defaults = column_defaults;
+ self
+ }
+
/// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
///
/// Setting a statistics cache on the `SessionContext` can avoid
refetching statistics
@@ -844,6 +856,10 @@ impl TableProvider for ListingTable {
.create_writer_physical_plan(input, state, config,
order_requirements)
.await
}
+
+ fn get_column_default(&self, column: &str) -> Option<&Expr> {
+ self.column_defaults.get(column)
+ }
}
impl ListingTable {
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index f70a820351..96436306c6 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -228,7 +228,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
let table = provider
.with_definition(cmd.definition.clone())
- .with_constraints(cmd.constraints.clone());
+ .with_constraints(cmd.constraints.clone())
+ .with_column_defaults(cmd.column_defaults.clone());
Ok(Arc::new(table))
}
}
@@ -279,6 +280,7 @@ mod tests {
unbounded: false,
options: HashMap::new(),
constraints: Constraints::empty(),
+ column_defaults: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index a841518d9c..7c044b2936 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,9 +19,9 @@
use datafusion_physical_plan::metrics::MetricsSet;
use futures::StreamExt;
-use hashbrown::HashMap;
use log::debug;
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;
diff --git a/datafusion/expr/src/logical_plan/ddl.rs
b/datafusion/expr/src/logical_plan/ddl.rs
index 97551a941a..e74992d993 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -194,6 +194,8 @@ pub struct CreateExternalTable {
pub options: HashMap<String, String>,
/// The list of constraints in the schema, such as primary key, unique,
etc.
pub constraints: Constraints,
+ /// Default values for columns
+ pub column_defaults: HashMap<String, Expr>,
}
// Hashing refers to a subset of fields considered in PartialEq.
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index e46e70a139..64b8e28074 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -216,6 +216,7 @@ message CreateExternalTableNode {
bool unbounded = 14;
map<string, string> options = 11;
Constraints constraints = 15;
+ map<string, LogicalExprNode> column_defaults = 16;
}
message PrepareNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index a1c1775419..34ad63d819 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4026,6 +4026,9 @@ impl serde::Serialize for CreateExternalTableNode {
if self.constraints.is_some() {
len += 1;
}
+ if !self.column_defaults.is_empty() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.CreateExternalTableNode", len)?;
if let Some(v) = self.name.as_ref() {
struct_ser.serialize_field("name", v)?;
@@ -4069,6 +4072,9 @@ impl serde::Serialize for CreateExternalTableNode {
if let Some(v) = self.constraints.as_ref() {
struct_ser.serialize_field("constraints", v)?;
}
+ if !self.column_defaults.is_empty() {
+ struct_ser.serialize_field("columnDefaults",
&self.column_defaults)?;
+ }
struct_ser.end()
}
}
@@ -4099,6 +4105,8 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
"unbounded",
"options",
"constraints",
+ "column_defaults",
+ "columnDefaults",
];
#[allow(clippy::enum_variant_names)]
@@ -4117,6 +4125,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
Unbounded,
Options,
Constraints,
+ ColumnDefaults,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -4152,6 +4161,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
"unbounded" => Ok(GeneratedField::Unbounded),
"options" => Ok(GeneratedField::Options),
"constraints" => Ok(GeneratedField::Constraints),
+ "columnDefaults" | "column_defaults" =>
Ok(GeneratedField::ColumnDefaults),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -4185,6 +4195,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
let mut unbounded__ = None;
let mut options__ = None;
let mut constraints__ = None;
+ let mut column_defaults__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Name => {
@@ -4273,6 +4284,14 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
}
constraints__ = map_.next_value()?;
}
+ GeneratedField::ColumnDefaults => {
+ if column_defaults__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("columnDefaults"));
+ }
+ column_defaults__ = Some(
+ map_.next_value::<std::collections::HashMap<_,
_>>()?
+ );
+ }
}
}
Ok(CreateExternalTableNode {
@@ -4290,6 +4309,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
unbounded: unbounded__.unwrap_or_default(),
options: options__.unwrap_or_default(),
constraints: constraints__,
+ column_defaults: column_defaults__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index b9fb616b31..8b4dd1b759 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -360,6 +360,11 @@ pub struct CreateExternalTableNode {
>,
#[prost(message, optional, tag = "15")]
pub constraints: ::core::option::Option<Constraints>,
+ #[prost(map = "string, message", tag = "16")]
+ pub column_defaults: ::std::collections::HashMap<
+ ::prost::alloc::string::String,
+ LogicalExprNode,
+ >,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 851f062bd5..50bca0295d 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
@@ -521,6 +522,13 @@ impl AsLogicalPlan for LogicalPlanNode {
order_exprs.push(order_expr)
}
+ let mut column_defaults =
+
HashMap::with_capacity(create_extern_table.column_defaults.len());
+ for (col_name, expr) in &create_extern_table.column_defaults {
+ let expr = from_proto::parse_expr(expr, ctx)?;
+ column_defaults.insert(col_name.clone(), expr);
+ }
+
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name:
from_owned_table_reference(create_extern_table.name.as_ref(),
"CreateExternalTable")?,
@@ -540,6 +548,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
constraints: constraints.into(),
+ column_defaults,
})))
}
LogicalPlanType::CreateView(create_view) => {
@@ -1298,6 +1307,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded,
options,
constraints,
+ column_defaults,
},
)) => {
let mut converted_order_exprs: Vec<LogicalExprNodeCollection>
= vec![];
@@ -1312,6 +1322,12 @@ impl AsLogicalPlan for LogicalPlanNode {
converted_order_exprs.push(temp);
}
+ let mut converted_column_defaults =
+ HashMap::with_capacity(column_defaults.len());
+ for (col_name, expr) in column_defaults {
+ converted_column_defaults.insert(col_name.clone(),
expr.try_into()?);
+ }
+
Ok(protobuf::LogicalPlanNode {
logical_plan_type:
Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
@@ -1329,6 +1345,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded: *unbounded,
options: options.clone(),
constraints: Some(constraints.clone().into()),
+ column_defaults: converted_column_defaults,
},
)),
})
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index e04a7a9c9d..5e36a838f3 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -217,11 +217,10 @@ async fn roundtrip_custom_memory_tables() -> Result<()> {
async fn roundtrip_custom_listing_tables() -> Result<()> {
let ctx = SessionContext::new();
- // Make sure during round-trip, constraint information is preserved
let query = "CREATE EXTERNAL TABLE multiple_ordered_table_with_pk (
a0 INTEGER,
- a INTEGER,
- b INTEGER,
+ a INTEGER DEFAULT 1*2 + 3,
+ b INTEGER DEFAULT NULL,
c INTEGER,
d INTEGER,
primary key(c)
@@ -232,11 +231,13 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';";
- let plan = ctx.sql(query).await?.into_optimized_plan()?;
+ let plan = ctx.state().create_logical_plan(query).await?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
- assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+ // Use exact matching to verify everything. Make sure during round-trip,
+ // information like constraints, column defaults, and other aspects of the
plan are preserved.
+ assert_eq!(plan, logical_round_trip);
Ok(())
}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 4220e83316..12083554f0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -762,11 +762,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;
}
+ let mut planner_context = PlannerContext::new();
+
+ let column_defaults = self
+ .build_column_defaults(&columns, &mut planner_context)?
+ .into_iter()
+ .collect();
+
let schema = self.build_schema(columns)?;
let df_schema = schema.to_dfschema_ref()?;
let ordered_exprs =
- self.build_order_by(order_exprs, &df_schema, &mut
PlannerContext::new())?;
+ self.build_order_by(order_exprs, &df_schema, &mut
planner_context)?;
// External tables do not support schemas at the moment, so the name
is just a table name
let name = OwnedTableReference::bare(name);
@@ -788,6 +795,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
unbounded,
options,
constraints,
+ column_defaults,
},
)))
}
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index 75252b3b7c..e20b377945 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -382,6 +382,27 @@ select a,b,c,d from test_column_defaults
1 10 100 ABC
NULL 20 500 default_text
+# fill the timestamp column with default value `now()` again, it should be
different from the previous one
+query IIITP
+insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF')
+----
+1
+
+# Ensure that the default expression `now()` is evaluated during insertion,
not optimized away.
+# Rows are inserted during different time, so their timestamp values should be
different.
+query I rowsort
+select count(distinct e) from test_column_defaults
+----
+3
+
+# Expect all rows to be true as now() was inserted into the table
+query B rowsort
+select e < now() from test_column_defaults
+----
+true
+true
+true
+
statement ok
drop table test_column_defaults
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 39323479ff..85c2db7faa 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -543,3 +543,70 @@ select * from table_without_values;
statement ok
drop table table_without_values;
+
+
+### Test for specifying column's default value
+
+statement ok
+CREATE EXTERNAL TABLE test_column_defaults(
+ a int,
+ b int not null default null,
+ c int default 100*2+300,
+ d text default lower('DEFAULT_TEXT'),
+ e timestamp default now()
+) STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6'
+OPTIONS (create_local_path 'true');
+
+# fill in all column values
+query IIITP
+insert into test_column_defaults values(1, 10, 100, 'ABC', now())
+----
+1
+
+statement error DataFusion error: Execution error: Invalid batch column at '1'
has null but schema specifies non-nullable
+insert into test_column_defaults(a) values(2)
+
+query IIITP
+insert into test_column_defaults(b) values(20)
+----
+1
+
+query IIIT rowsort
+select a,b,c,d from test_column_defaults
+----
+1 10 100 ABC
+NULL 20 500 default_text
+
+# fill the timestamp column with default value `now()` again, it should be
different from the previous one
+query IIITP
+insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF')
+----
+1
+
+# Ensure that the default expression `now()` is evaluated during insertion,
not optimized away.
+# Rows are inserted during different time, so their timestamp values should be
different.
+query I rowsort
+select count(distinct e) from test_column_defaults
+----
+3
+
+# Expect all rows to be true as now() was inserted into the table
+query B rowsort
+select e < now() from test_column_defaults
+----
+true
+true
+true
+
+statement ok
+drop table test_column_defaults
+
+# test invalid default value
+statement error DataFusion error: Error during planning: Column reference is
not allowed in the DEFAULT expression : Schema error: No field named a.
+CREATE EXTERNAL TABLE test_column_defaults(
+ a int,
+ b int default a+1
+) STORED AS parquet
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7'
+OPTIONS (create_local_path 'true');