This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 23aec321bd8 [SPARK-41049][SQL][FOLLOWUP] Move expression
initialization code to the base class
23aec321bd8 is described below
commit 23aec321bd822867a698ee3bc000017b21753ce8
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Jan 3 10:46:44 2023 -0800
[SPARK-41049][SQL][FOLLOWUP] Move expression initialization code to the
base class
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/39248 , to add
one more code cleanup. The expression initialization code is duplicated 6 times
and we should put it in the base class.
### Why are the changes needed?
code cleanup
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #39364 from cloud-fan/expr.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../spark/sql/catalyst/expressions/ExpressionsEvaluator.scala | 7 +++++++
.../sql/catalyst/expressions/InterpretedMutableProjection.scala | 5 +----
.../spark/sql/catalyst/expressions/InterpretedSafeProjection.scala | 5 +----
.../sql/catalyst/expressions/InterpretedUnsafeProjection.scala | 5 +----
.../org/apache/spark/sql/catalyst/expressions/Projection.scala | 5 +----
.../org/apache/spark/sql/catalyst/expressions/predicates.scala | 6 +-----
6 files changed, 12 insertions(+), 21 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala
index dcbc6926cd3..1fc0144fede 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala
@@ -42,4 +42,11 @@ trait ExpressionsEvaluator {
* The default implementation does nothing.
*/
def initialize(partitionIndex: Int): Unit = {}
+
+ protected def initializeExprs(exprs: Seq[Expression], partitionIndex: Int):
Unit = {
+ exprs.foreach(_.foreach {
+ case n: Nondeterministic => n.initialize(partitionIndex)
+ case _ =>
+ })
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
index 682604b9bf7..01e9de085da 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
@@ -41,10 +41,7 @@ class InterpretedMutableProjection(expressions:
Seq[Expression]) extends Mutable
private[this] val buffer = new Array[Any](expressions.size)
override def initialize(partitionIndex: Int): Unit = {
- exprs.foreach(_.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- })
+ initializeExprs(exprs, partitionIndex)
}
private[this] val validExprs = expressions.zipWithIndex.filter {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
index 84263d97f5d..87539e80b0b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
@@ -101,10 +101,7 @@ class InterpretedSafeProjection(expressions:
Seq[Expression]) extends Projection
}
override def initialize(partitionIndex: Int): Unit = {
- expressions.foreach(_.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- })
+ initializeExprs(exprs, partitionIndex)
}
override def apply(row: InternalRow): InternalRow = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
index 9108a045c09..90a90444695 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
@@ -67,10 +67,7 @@ class InterpretedUnsafeProjection(expressions:
Array[Expression]) extends Unsafe
}
override def initialize(partitionIndex: Int): Unit = {
- exprs.foreach(_.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- })
+ initializeExprs(exprs, partitionIndex)
}
override def apply(row: InternalRow): UnsafeRow = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 20969fa584a..7d993d776d1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -41,10 +41,7 @@ class InterpretedProjection(expressions: Seq[Expression])
extends Projection {
}
override def initialize(partitionIndex: Int): Unit = {
- exprArray.foreach(_.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- })
+ initializeExprs(exprArray, partitionIndex)
}
def apply(input: InternalRow): InternalRow = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 4e4ac6ee492..6a58f8d3416 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -53,11 +53,7 @@ case class InterpretedPredicate(expression: Expression)
extends BasePredicate {
}
override def initialize(partitionIndex: Int): Unit = {
- super.initialize(partitionIndex)
- expr.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- }
+ initializeExprs(Seq(expr), partitionIndex)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]