This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f27c4c308 chore: Refactor planner random and partition expressions
(#3704)
f27c4c308 is described below
commit f27c4c308c522be4f4749d1e60f11547d61929f3
Author: Bhargava Vadlamani <[email protected]>
AuthorDate: Tue Mar 17 11:56:35 2026 -0700
chore: Refactor planner random and partition expressions (#3704)
---
native/core/src/execution/expressions/mod.rs | 2 +
native/core/src/execution/expressions/partition.rs | 57 ++++++++++++++++++++++
native/core/src/execution/expressions/random.rs | 56 +++++++++++++++++++++
native/core/src/execution/planner.rs | 24 +++------
.../src/execution/planner/expression_registry.rs | 30 ++++++++++++
5 files changed, 152 insertions(+), 17 deletions(-)
diff --git a/native/core/src/execution/expressions/mod.rs
b/native/core/src/execution/expressions/mod.rs
index 563d62e91..c2b144b7d 100644
--- a/native/core/src/execution/expressions/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -22,6 +22,8 @@ pub mod bitwise;
pub mod comparison;
pub mod logical;
pub mod nullcheck;
+pub mod partition;
+pub mod random;
pub mod strings;
pub mod subquery;
pub mod temporal;
diff --git a/native/core/src/execution/expressions/partition.rs
b/native/core/src/execution/expressions/partition.rs
new file mode 100644
index 000000000..4b0287f8c
--- /dev/null
+++ b/native/core/src/execution/expressions/partition.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::execution::planner::expression_registry::ExpressionBuilder;
+use crate::execution::planner::PhysicalPlanner;
+use arrow::datatypes::SchemaRef;
+use datafusion::common::ScalarValue;
+use datafusion::physical_expr::expressions::Literal;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_proto::spark_expression::Expr;
+use
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId;
+use std::sync::Arc;
+
+pub struct SparkPartitionIdBuilder;
+
+impl ExpressionBuilder for SparkPartitionIdBuilder {
+ fn build(
+ &self,
+ _spark_expr: &Expr,
+ _input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ Ok(Arc::new(Literal::new(ScalarValue::Int32(Some(
+ planner.partition(),
+ )))))
+ }
+}
+
+pub struct MonotonicallyIncreasingIdBuilder;
+
+impl ExpressionBuilder for MonotonicallyIncreasingIdBuilder {
+ fn build(
+ &self,
+ _spark_expr: &Expr,
+ _input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ Ok(Arc::new(MonotonicallyIncreasingId::from_partition_id(
+ planner.partition(),
+ )))
+ }
+}
diff --git a/native/core/src/execution/expressions/random.rs
b/native/core/src/execution/expressions/random.rs
new file mode 100644
index 000000000..5ea6092cb
--- /dev/null
+++ b/native/core/src/execution/expressions/random.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::execution::planner::expression_registry::ExpressionBuilder;
+use crate::execution::planner::PhysicalPlanner;
+use crate::extract_expr;
+use arrow::datatypes::SchemaRef;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_proto::spark_expression::Expr;
+use datafusion_comet_spark_expr::{RandExpr, RandnExpr};
+use std::sync::Arc;
+
+pub struct RandBuilder;
+
+impl ExpressionBuilder for RandBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ _input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, Rand);
+ let seed = expr.seed.wrapping_add(planner.partition().into());
+ Ok(Arc::new(RandExpr::new(seed)))
+ }
+}
+
+pub struct RandnBuilder;
+
+impl ExpressionBuilder for RandnBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ _input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, Randn);
+ let seed = expr.seed.wrapping_add(planner.partition().into());
+ Ok(Arc::new(RandnExpr::new(seed)))
+ }
+}
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 15bbabe88..bd3775592 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -123,12 +123,11 @@ use datafusion_comet_proto::{
},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as
SparkPartitioning},
};
-use
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId;
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct,
DecimalRescaleCheckOverflow, GetArrayStructFields, GetStructField, IfExpr,
ListExtract,
- NormalizeNaNAndZero, RandExpr, RandnExpr, SparkCastOptions, Stddev,
SumDecimal, ToJson,
- UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
+ NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, ToJson,
UnboundColumn, Variance,
+ WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
use jni::objects::GlobalRef;
@@ -197,6 +196,11 @@ impl PhysicalPlanner {
&self.session_ctx
}
+ /// Return partition id of this planner.
+ pub fn partition(&self) -> i32 {
+ self.partition
+ }
+
/// get DataFusion PartitionedFiles from a Spark FilePartition
fn get_partitioned_files(
&self,
@@ -655,20 +659,6 @@ impl PhysicalPlanner {
expr.legacy_negative_index,
)))
}
- ExprStruct::Rand(expr) => {
- let seed = expr.seed.wrapping_add(self.partition.into());
- Ok(Arc::new(RandExpr::new(seed)))
- }
- ExprStruct::Randn(expr) => {
- let seed = expr.seed.wrapping_add(self.partition.into());
- Ok(Arc::new(RandnExpr::new(seed)))
- }
- ExprStruct::SparkPartitionId(_) =>
Ok(Arc::new(DataFusionLiteral::new(
- ScalarValue::Int32(Some(self.partition)),
- ))),
- ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
- MonotonicallyIncreasingId::from_partition_id(self.partition),
- )),
ExprStruct::ToCsv(expr) => {
let csv_struct_expr =
self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
diff --git a/native/core/src/execution/planner/expression_registry.rs
b/native/core/src/execution/planner/expression_registry.rs
index 34aa3de17..bf3904d9c 100644
--- a/native/core/src/execution/planner/expression_registry.rs
+++ b/native/core/src/execution/planner/expression_registry.rs
@@ -184,6 +184,12 @@ impl ExpressionRegistry {
// Register temporal expressions
self.register_temporal_expressions();
+
+ // Register random expressions
+ self.register_random_expressions();
+
+ // Register partition expressions
+ self.register_partition_expressions();
}
/// Register arithmetic expression builders
@@ -386,4 +392,28 @@ impl ExpressionRegistry {
)),
}
}
+
+ /// Register random expression builders
+ fn register_random_expressions(&mut self) {
+ use crate::execution::expressions::random::*;
+
+ self.builders
+ .insert(ExpressionType::Rand, Box::new(RandBuilder));
+ self.builders
+ .insert(ExpressionType::Randn, Box::new(RandnBuilder));
+ }
+
+ /// Register partition expression builders
+ fn register_partition_expressions(&mut self) {
+ use crate::execution::expressions::partition::*;
+
+ self.builders.insert(
+ ExpressionType::SparkPartitionId,
+ Box::new(SparkPartitionIdBuilder),
+ );
+ self.builders.insert(
+ ExpressionType::MonotonicallyIncreasingId,
+ Box::new(MonotonicallyIncreasingIdBuilder),
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]