This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d28068434c9 [SPARK-41732][SQL][SS] Apply tree-pattern based pruning
for the rule SessionWindowing
d28068434c9 is described below
commit d28068434c96348815afb6fe4883744113af5cde
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Dec 28 12:58:06 2022 +0900
[SPARK-41732][SQL][SS] Apply tree-pattern based pruning for the rule
SessionWindowing
### What changes were proposed in this pull request?
This PR proposes to apply tree-pattern based pruning for the rule
SessionWindowing, to minimize the evaluation of rule with SessionWindow node.
### Why are the changes needed?
The rule SessionWindowing is unnecessarily evaluated multiple times without
proper pruning.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #39245 from HeartSaVioR/SPARK-41732.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 5 +++--
.../org/apache/spark/sql/catalyst/expressions/SessionWindow.scala | 2 ++
.../scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 +
4 files changed, 7 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index df6b1c400bb..be837d72c5a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField,
IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract,
TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW,
TIME_WINDOW}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType,
Metadata, MetadataBuilder, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
@@ -187,7 +187,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
* This also adds a marker to the session column so that downstream can
easily find the column
* on session window.
*/
- def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+ def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
+ _.containsPattern(SESSION_WINDOW), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val sessionExpressions =
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
index 02273b0c461..021f119e0a1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW,
TreePattern}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -91,6 +92,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration:
Expression) extend
override def dataType: DataType = new StructType()
.add(StructField("start", children.head.dataType))
.add(StructField("end", children.head.dataType))
+ final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW)
// This expression is replaced in the analyzer.
override lazy val resolved = false
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 41aa68f0ec6..e824a0b533d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -94,6 +94,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
+ "org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
"org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule"
::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index fbe885bda06..e2e7fca27e0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -82,6 +82,7 @@ object TreePattern extends Enumeration {
val SCALAR_SUBQUERY: Value = Value
val SCALAR_SUBQUERY_REFERENCE: Value = Value
val SCALA_UDF: Value = Value
+ val SESSION_WINDOW: Value = Value
val SORT: Value = Value
val SUBQUERY_ALIAS: Value = Value
val SUM: Value = Value
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]