This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 87a235c2143 [SPARK-41733][SQL][SS] Apply tree-pattern based pruning
for the rule ResolveWindowTime
87a235c2143 is described below
commit 87a235c2143449bd8da0acee4ec3cd99993155bb
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Dec 28 16:01:25 2022 +0900
[SPARK-41733][SQL][SS] Apply tree-pattern based pruning for the rule
ResolveWindowTime
### What changes were proposed in this pull request?
This PR proposes to apply tree-pattern based pruning for the rule
ResolveWindowTime, to minimize the evaluation of rule with WindowTime node.
### Why are the changes needed?
The rule ResolveWindowTime is unnecessarily evaluated multiple times
without proper pruning.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests. New test case is added to cover SQL usage for `window_time`.
Closes #39247 from HeartSaVioR/SPARK-41733.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/catalyst/analysis/ResolveTimeWindows.scala | 5 ++--
.../sql/catalyst/expressions/WindowTime.scala | 3 +++
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../spark/sql/catalyst/trees/TreePatterns.scala | 1 +
.../spark/sql/DataFrameTimeWindowingSuite.scala | 27 ++++++++++++++++++++++
5 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index be837d72c5a..6378f4eedd3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField,
IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract,
TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW,
TIME_WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW,
TIME_WINDOW, WINDOW_TIME}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType,
Metadata, MetadataBuilder, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
@@ -287,7 +287,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
* The correct representative event time of a window is ``window.end - 1``.
* */
object ResolveWindowTime extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
+ _.containsPattern(WINDOW_TIME), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowTimeExpressions =
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
index 1bb934cb202..59b5ca8f2bd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern,
WINDOW_TIME}
import org.apache.spark.sql.types._
// scalastyle:off line.size.limit line.contains.tab
@@ -52,6 +53,8 @@ case class WindowTime(windowColumn: Expression)
override def dataType: DataType =
child.dataType.asInstanceOf[StructType].head.dataType
+ final override val nodePatterns: Seq[TreePattern] = Seq(WINDOW_TIME)
+
override def prettyName: String = "window_time"
// This expression is replaced in the analyzer.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index e824a0b533d..4cf774b0362 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -94,6 +94,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
+ "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
"org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index e2e7fca27e0..9eb8ce21ef2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -90,6 +90,7 @@ object TreePattern extends Enumeration {
val TIME_ZONE_AWARE_EXPRESSION: Value = Value
val TRUE_OR_FALSE_LITERAL: Value = Value
val WINDOW_EXPRESSION: Value = Value
+ val WINDOW_TIME: Value = Value
val UNARY_POSITIVE: Value = Value
val UNPIVOT: Value = Value
val UPDATE_FIELDS: Value = Value
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index a878e0ffa51..0bbb9460feb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -651,4 +651,31 @@ class DataFrameTimeWindowingSuite extends QueryTest with
SharedSparkSession {
)
)
}
+
+ test("window_time in SQL") {
+ withTempView("tmpView") {
+ val df = Seq(
+ ("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2)
+ ).toDF("time", "value")
+ df.createOrReplaceTempView("tmpView")
+ checkAnswer(
+ spark.sql(
+ s"""
+ |select
+ | CAST(window.start AS string), CAST(window.end AS string),
+ | CAST(window_time(window) AS string), counts
+ |from
+ |(
+ | select window, count(*) AS counts from tmpView
+ | group by window(time, "10 seconds")
+ | order by window.start
+ |)
+ |""".stripMargin),
+ Seq(
+ Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27
19:38:19.999999", 1),
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27
19:39:29.999999", 1)
+ )
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]