This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e39ffa6b8 extract struct expressions to folders based on spark
grouping (#1216)
e39ffa6b8 is described below
commit e39ffa6b83b368fd6ad52da4ebb738ebced14679
Author: Raz Luvaton <[email protected]>
AuthorDate: Mon Jan 6 18:53:23 2025 +0200
extract struct expressions to folders based on spark grouping (#1216)
---
native/spark-expr/src/lib.rs | 5 +-
.../create_named_struct.rs} | 100 +----------------
.../src/struct_funcs/get_struct_field.rs | 125 +++++++++++++++++++++
native/spark-expr/src/struct_funcs/mod.rs | 22 ++++
4 files changed, 152 insertions(+), 100 deletions(-)
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index f35873100..827da5d7d 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -47,7 +47,7 @@ pub use schema_adapter::SparkSchemaAdapterFactory;
pub mod spark_hash;
mod stddev;
pub use stddev::Stddev;
-mod structs;
+mod struct_funcs;
mod sum_decimal;
pub use sum_decimal::SumDecimal;
mod negative;
@@ -72,7 +72,8 @@ pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use list::{ArrayInsert, GetArrayStructFields, ListExtract};
pub use regexp::RLike;
-pub use structs::{CreateNamedStruct, GetStructField};
+pub use struct_funcs::*;
+
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr,
TimestampTruncExpr};
pub use to_json::ToJson;
diff --git a/native/spark-expr/src/structs.rs
b/native/spark-expr/src/struct_funcs/create_named_struct.rs
similarity index 64%
rename from native/spark-expr/src/structs.rs
rename to native/spark-expr/src/struct_funcs/create_named_struct.rs
index 7cc49e428..df6312741 100644
--- a/native/spark-expr/src/structs.rs
+++ b/native/spark-expr/src/struct_funcs/create_named_struct.rs
@@ -16,10 +16,10 @@
// under the License.
use arrow::record_batch::RecordBatch;
-use arrow_array::{Array, StructArray};
+use arrow_array::StructArray;
use arrow_schema::{DataType, Field, Schema};
use datafusion::logical_expr::ColumnarValue;
-use datafusion_common::{DataFusionError, Result as DataFusionResult,
ScalarValue};
+use datafusion_common::Result as DataFusionResult;
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
@@ -106,102 +106,6 @@ impl Display for CreateNamedStruct {
}
}
-#[derive(Debug, Eq)]
-pub struct GetStructField {
- child: Arc<dyn PhysicalExpr>,
- ordinal: usize,
-}
-
-impl Hash for GetStructField {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.child.hash(state);
- self.ordinal.hash(state);
- }
-}
-impl PartialEq for GetStructField {
- fn eq(&self, other: &Self) -> bool {
- self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
- }
-}
-
-impl GetStructField {
- pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
- Self { child, ordinal }
- }
-
- fn child_field(&self, input_schema: &Schema) ->
DataFusionResult<Arc<Field>> {
- match self.child.data_type(input_schema)? {
- DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
- data_type => Err(DataFusionError::Plan(format!(
- "Expect struct field, got {:?}",
- data_type
- ))),
- }
- }
-}
-
-impl PhysicalExpr for GetStructField {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
- Ok(self.child_field(input_schema)?.data_type().clone())
- }
-
- fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
- Ok(self.child_field(input_schema)?.is_nullable())
- }
-
- fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue>
{
- let child_value = self.child.evaluate(batch)?;
-
- match child_value {
- ColumnarValue::Array(array) => {
- let struct_array = array
- .as_any()
- .downcast_ref::<StructArray>()
- .expect("A struct is expected");
-
- Ok(ColumnarValue::Array(Arc::clone(
- struct_array.column(self.ordinal),
- )))
- }
- ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) =>
Ok(ColumnarValue::Array(
- Arc::clone(struct_array.column(self.ordinal)),
- )),
- value => Err(DataFusionError::Execution(format!(
- "Expected a struct array, got {:?}",
- value
- ))),
- }
- }
-
- fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
- vec![&self.child]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- children: Vec<Arc<dyn PhysicalExpr>>,
- ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
- Ok(Arc::new(GetStructField::new(
- Arc::clone(&children[0]),
- self.ordinal,
- )))
- }
-}
-
-impl Display for GetStructField {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "GetStructField [child: {:?}, ordinal: {:?}]",
- self.child, self.ordinal
- )
- }
-}
-
#[cfg(test)]
mod test {
use super::CreateNamedStruct;
diff --git a/native/spark-expr/src/struct_funcs/get_struct_field.rs
b/native/spark-expr/src/struct_funcs/get_struct_field.rs
new file mode 100644
index 000000000..c4e1a1e23
--- /dev/null
+++ b/native/spark-expr/src/struct_funcs/get_struct_field.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::record_batch::RecordBatch;
+use arrow_array::{Array, StructArray};
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::logical_expr::ColumnarValue;
+use datafusion_common::{DataFusionError, Result as DataFusionResult,
ScalarValue};
+use datafusion_physical_expr::PhysicalExpr;
+use std::{
+ any::Any,
+ fmt::{Display, Formatter},
+ hash::Hash,
+ sync::Arc,
+};
+
+#[derive(Debug, Eq)]
+pub struct GetStructField {
+ child: Arc<dyn PhysicalExpr>,
+ ordinal: usize,
+}
+
+impl Hash for GetStructField {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.child.hash(state);
+ self.ordinal.hash(state);
+ }
+}
+impl PartialEq for GetStructField {
+ fn eq(&self, other: &Self) -> bool {
+ self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
+ }
+}
+
+impl GetStructField {
+ pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
+ Self { child, ordinal }
+ }
+
+ fn child_field(&self, input_schema: &Schema) ->
DataFusionResult<Arc<Field>> {
+ match self.child.data_type(input_schema)? {
+ DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
+ data_type => Err(DataFusionError::Plan(format!(
+ "Expect struct field, got {:?}",
+ data_type
+ ))),
+ }
+ }
+}
+
+impl PhysicalExpr for GetStructField {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
+ Ok(self.child_field(input_schema)?.data_type().clone())
+ }
+
+ fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
+ Ok(self.child_field(input_schema)?.is_nullable())
+ }
+
+ fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue>
{
+ let child_value = self.child.evaluate(batch)?;
+
+ match child_value {
+ ColumnarValue::Array(array) => {
+ let struct_array = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .expect("A struct is expected");
+
+ Ok(ColumnarValue::Array(Arc::clone(
+ struct_array.column(self.ordinal),
+ )))
+ }
+ ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) =>
Ok(ColumnarValue::Array(
+ Arc::clone(struct_array.column(self.ordinal)),
+ )),
+ value => Err(DataFusionError::Execution(format!(
+ "Expected a struct array, got {:?}",
+ value
+ ))),
+ }
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.child]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
+ Ok(Arc::new(GetStructField::new(
+ Arc::clone(&children[0]),
+ self.ordinal,
+ )))
+ }
+}
+
+impl Display for GetStructField {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "GetStructField [child: {:?}, ordinal: {:?}]",
+ self.child, self.ordinal
+ )
+ }
+}
diff --git a/native/spark-expr/src/struct_funcs/mod.rs
b/native/spark-expr/src/struct_funcs/mod.rs
new file mode 100644
index 000000000..86edcceac
--- /dev/null
+++ b/native/spark-expr/src/struct_funcs/mod.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod create_named_struct;
+mod get_struct_field;
+
+pub use create_named_struct::CreateNamedStruct;
+pub use get_struct_field::GetStructField;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]