This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e221a2c5b0 feat: support customize metadata in alias for dataframe api
(#15120)
e221a2c5b0 is described below
commit e221a2c5b0dabbc04c6bb9112b988042ded2b1e1
Author: Chen Chongchen <[email protected]>
AuthorDate: Thu Mar 13 23:42:54 2025 +0800
feat: support customize metadata in alias for dataframe api (#15120)
* feat: support customize metadata in alias for dataframe api
* update doc
* remove clone
---
datafusion/core/src/dataframe/mod.rs | 1 +
datafusion/core/tests/dataframe/mod.rs | 15 ++++
datafusion/expr/src/expr.rs | 80 +++++++++++++++++++++-
datafusion/expr/src/expr_schema.rs | 11 ++-
datafusion/expr/src/tree_node.rs | 5 +-
.../optimizer/src/optimize_projections/mod.rs | 5 +-
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 19 +++++
datafusion/proto/src/generated/prost.rs | 5 ++
datafusion/proto/src/logical_plan/to_proto.rs | 4 ++
datafusion/sql/src/unparser/rewrite.rs | 1 +
11 files changed, 143 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index e998e489a9..e87cc81300 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -2039,6 +2039,7 @@ impl DataFrame {
})),
relation: None,
name: field.name().to_string(),
+ metadata: None,
}),
Err(_) => col(field.name()),
}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index a902cf8ae6..986e2a0335 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -5374,6 +5374,21 @@ async fn test_alias() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_alias_with_metadata() -> Result<()> {
+ let mut metadata = HashMap::new();
+ metadata.insert(String::from("k"), String::from("v"));
+ let df = create_test_table("test")
+ .await?
+ .select(vec![col("a").alias_with_metadata("b", Some(metadata))])?
+ .alias("table_alias")?;
+ let df = df.select(vec![col("table_alias.b")])?;
+ let schema = df.schema();
+ let metadata = schema.field(0).metadata();
+ assert_eq!(metadata.get("k"), Some(&String::from("v")));
+ Ok(())
+}
+
// Use alias to perform a self-join
// Issue: https://github.com/apache/datafusion/issues/14112
#[tokio::test]
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 5627963225..6f110895a4 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -391,11 +391,34 @@ impl Unnest {
}
/// Alias expression
-#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<TableReference>,
pub name: String,
+ pub metadata: Option<std::collections::HashMap<String, String>>,
+}
+
+impl Hash for Alias {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.expr.hash(state);
+ self.relation.hash(state);
+ self.name.hash(state);
+ }
+}
+
+impl PartialOrd for Alias {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ let cmp = self.expr.partial_cmp(&other.expr);
+ let Some(std::cmp::Ordering::Equal) = cmp else {
+ return cmp;
+ };
+ let cmp = self.relation.partial_cmp(&other.relation);
+ let Some(std::cmp::Ordering::Equal) = cmp else {
+ return cmp;
+ };
+ self.name.partial_cmp(&other.name)
+ }
}
impl Alias {
@@ -409,8 +432,17 @@ impl Alias {
expr: Box::new(expr),
relation: relation.map(|r| r.into()),
name: name.into(),
+ metadata: None,
}
}
+
+ pub fn with_metadata(
+ mut self,
+ metadata: Option<std::collections::HashMap<String, String>>,
+ ) -> Self {
+ self.metadata = metadata;
+ self
+ }
}
/// Binary expression
@@ -1278,6 +1310,27 @@ impl Expr {
Expr::Alias(Alias::new(self, None::<&str>, name.into()))
}
+ /// Return `self AS name` alias expression with metadata
+ ///
+ /// The metadata will be attached to the Arrow Schema field when the
expression
+ /// is converted to a field via `Expr.to_field()`.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion_expr::col;
+ /// use std::collections::HashMap;
+ /// let metadata = HashMap::from([("key".to_string(),
"value".to_string())]);
+ /// let expr = col("foo").alias_with_metadata("bar", Some(metadata));
+ /// ```
+ ///
+ pub fn alias_with_metadata(
+ self,
+ name: impl Into<String>,
+ metadata: Option<std::collections::HashMap<String, String>>,
+ ) -> Expr {
+ Expr::Alias(Alias::new(self, None::<&str>,
name.into()).with_metadata(metadata))
+ }
+
/// Return `self AS name` alias expression with a specific qualifier
pub fn alias_qualified(
self,
@@ -1287,6 +1340,28 @@ impl Expr {
Expr::Alias(Alias::new(self, relation, name.into()))
}
+ /// Return `self AS name` alias expression with a specific qualifier and
metadata
+ ///
+ /// The metadata will be attached to the Arrow Schema field when the
expression
+ /// is converted to a field via `Expr.to_field()`.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion_expr::col;
+ /// use std::collections::HashMap;
+ /// let metadata = HashMap::from([("key".to_string(),
"value".to_string())]);
+ /// let expr = col("foo").alias_qualified_with_metadata(Some("tbl"),
"bar", Some(metadata));
+ /// ```
+ ///
+ pub fn alias_qualified_with_metadata(
+ self,
+ relation: Option<impl Into<TableReference>>,
+ name: impl Into<String>,
+ metadata: Option<std::collections::HashMap<String, String>>,
+ ) -> Expr {
+ Expr::Alias(Alias::new(self, relation,
name.into()).with_metadata(metadata))
+ }
+
/// Remove an alias from an expression if one exists.
///
/// If the expression is not an alias, the expression is returned
unchanged.
@@ -1738,11 +1813,13 @@ impl NormalizeEq for Expr {
expr: self_expr,
relation: self_relation,
name: self_name,
+ ..
}),
Expr::Alias(Alias {
expr: other_expr,
relation: other_relation,
name: other_name,
+ ..
}),
) => {
self_name == other_name
@@ -2088,6 +2165,7 @@ impl HashNode for Expr {
expr: _expr,
relation,
name,
+ ..
}) => {
relation.hash(state);
name.hash(state);
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 0a14cb5c60..318640282f 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -343,7 +343,16 @@ impl ExprSchemable for Expr {
fn metadata(&self, schema: &dyn ExprSchema) -> Result<HashMap<String,
String>> {
match self {
Expr::Column(c) => Ok(schema.metadata(c)?.clone()),
- Expr::Alias(Alias { expr, .. }) => expr.metadata(schema),
+ Expr::Alias(Alias { expr, metadata, .. }) => {
+ let mut ret = expr.metadata(schema)?;
+ if let Some(metadata) = metadata {
+ if !metadata.is_empty() {
+ ret.extend(metadata.clone());
+ return Ok(ret);
+ }
+ }
+ Ok(ret)
+ }
Expr::Cast(Cast { expr, .. }) => expr.metadata(schema),
_ => Ok(HashMap::new()),
}
diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs
index 49cc79c60a..f20dab7e16 100644
--- a/datafusion/expr/src/tree_node.rs
+++ b/datafusion/expr/src/tree_node.rs
@@ -132,7 +132,10 @@ impl TreeNode for Expr {
expr,
relation,
name,
- }) => f(*expr)?.update_data(|e| e.alias_qualified(relation, name)),
+ metadata,
+ }) => f(*expr)?.update_data(|e| {
+ e.alias_qualified_with_metadata(relation, name, metadata)
+ }),
Expr::InSubquery(InSubquery {
expr,
subquery,
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index b7dd391586..b3a09e2dcb 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -492,8 +492,11 @@ fn merge_consecutive_projections(proj: Projection) ->
Result<Transformed<Project
expr,
relation,
name,
+ metadata,
}) => rewrite_expr(*expr, &prev_projection).map(|result| {
- result.update_data(|expr| Expr::Alias(Alias::new(expr,
relation, name)))
+ result.update_data(|expr| {
+ Expr::Alias(Alias::new(expr, relation,
name).with_metadata(metadata))
+ })
}),
e => rewrite_expr(e, &prev_projection),
}
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 1cdfe6d216..81e1a5c418 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -484,6 +484,7 @@ message AliasNode {
LogicalExprNode expr = 1;
string alias = 2;
repeated TableReference relation = 3;
+ map<string, string> metadata = 4;
}
message BinaryExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 6e09e9a797..664fb706f8 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -770,6 +770,9 @@ impl serde::Serialize for AliasNode {
if !self.relation.is_empty() {
len += 1;
}
+ if !self.metadata.is_empty() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.AliasNode", len)?;
if let Some(v) = self.expr.as_ref() {
struct_ser.serialize_field("expr", v)?;
@@ -780,6 +783,9 @@ impl serde::Serialize for AliasNode {
if !self.relation.is_empty() {
struct_ser.serialize_field("relation", &self.relation)?;
}
+ if !self.metadata.is_empty() {
+ struct_ser.serialize_field("metadata", &self.metadata)?;
+ }
struct_ser.end()
}
}
@@ -793,6 +799,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
"expr",
"alias",
"relation",
+ "metadata",
];
#[allow(clippy::enum_variant_names)]
@@ -800,6 +807,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
Expr,
Alias,
Relation,
+ Metadata,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -824,6 +832,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
"expr" => Ok(GeneratedField::Expr),
"alias" => Ok(GeneratedField::Alias),
"relation" => Ok(GeneratedField::Relation),
+ "metadata" => Ok(GeneratedField::Metadata),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -846,6 +855,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
let mut expr__ = None;
let mut alias__ = None;
let mut relation__ = None;
+ let mut metadata__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Expr => {
@@ -866,12 +876,21 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
}
relation__ = Some(map_.next_value()?);
}
+ GeneratedField::Metadata => {
+ if metadata__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("metadata"));
+ }
+ metadata__ = Some(
+ map_.next_value::<std::collections::HashMap<_,
_>>()?
+ );
+ }
}
}
Ok(AliasNode {
expr: expr__,
alias: alias__.unwrap_or_default(),
relation: relation__.unwrap_or_default(),
+ metadata: metadata__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index f5ec45da48..8ab175cdf0 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -739,6 +739,11 @@ pub struct AliasNode {
pub alias: ::prost::alloc::string::String,
#[prost(message, repeated, tag = "3")]
pub relation: ::prost::alloc::vec::Vec<TableReference>,
+ #[prost(map = "string, string", tag = "4")]
+ pub metadata: ::std::collections::HashMap<
+ ::prost::alloc::string::String,
+ ::prost::alloc::string::String,
+ >,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BinaryExprNode {
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 5bb0cdb20c..841c31fa03 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -19,6 +19,8 @@
//! DataFusion logical plans to be serialized and transmitted between
//! processes.
+use std::collections::HashMap;
+
use datafusion_common::{TableReference, UnnestOptions};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::expr::{
@@ -200,6 +202,7 @@ pub fn serialize_expr(
expr,
relation,
name,
+ metadata,
}) => {
let alias = Box::new(protobuf::AliasNode {
expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)),
@@ -208,6 +211,7 @@ pub fn serialize_expr(
.map(|r| vec![r.into()])
.unwrap_or(vec![]),
alias: name.to_owned(),
+ metadata: metadata.to_owned().unwrap_or(HashMap::new()),
});
protobuf::LogicalExprNode {
expr_type: Some(ExprType::Alias(alias)),
diff --git a/datafusion/sql/src/unparser/rewrite.rs
b/datafusion/sql/src/unparser/rewrite.rs
index 2e3c8e9e94..aa480cf4ff 100644
--- a/datafusion/sql/src/unparser/rewrite.rs
+++ b/datafusion/sql/src/unparser/rewrite.rs
@@ -363,6 +363,7 @@ pub(super) fn inject_column_aliases(
expr: Box::new(expr.clone()),
relation,
name: col_alias.value,
+ metadata: None,
})
})
.collect::<Vec<_>>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]