Repository: spark
Updated Branches:
  refs/heads/master e3201e165 -> e646ae67f


[SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`WindowExec#createBoundOrdering` is called on executor side, so we can't use 
`conf.sessionLocalTimezone` there.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21225 from cloud-fan/minor3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e646ae67
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e646ae67
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e646ae67

Branch: refs/heads/master
Commit: e646ae67f2e793204bc819ab2b90815214c2bbf3
Parents: e3201e1
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu May 3 17:27:13 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu May 3 17:27:13 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/window/WindowExec.scala      | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e646ae67/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 800a2ea..626f39d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -112,9 +112,11 @@ case class WindowExec(
    *
    * @param frame to evaluate. This can either be a Row or Range frame.
    * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
    * @return a bound ordering object.
    */
-  private[this] def createBoundOrdering(frame: FrameType, bound: Expression): 
BoundOrdering = {
+  private[this] def createBoundOrdering(
+      frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
     (frame, bound) match {
       case (RowFrame, CurrentRow) =>
         RowBoundOrdering(0)
@@ -144,7 +146,7 @@ case class WindowExec(
         val boundExpr = (expr.dataType, boundOffset.dataType) match {
           case (DateType, IntegerType) => DateAdd(expr, boundOffset)
           case (TimestampType, CalendarIntervalType) =>
-            TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+            TimeAdd(expr, boundOffset, Some(timeZone))
           case (a, b) if a== b => Add(expr, boundOffset)
         }
         val bound = newMutableProjection(boundExpr :: Nil, child.output)
@@ -197,6 +199,7 @@ case class WindowExec(
 
     // Map the groups to a (unbound) expression and frame factory pair.
     var numExpressions = 0
+    val timeZone = conf.sessionLocalTimeZone
     framedFunctions.toSeq.map {
       case (key, (expressions, functionSeq)) =>
         val ordinal = numExpressions
@@ -237,7 +240,7 @@ case class WindowExec(
               new UnboundedPrecedingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, upper, timeZone))
             }
 
           // Shrinking Frame.
@@ -246,7 +249,7 @@ case class WindowExec(
               new UnboundedFollowingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower))
+                createBoundOrdering(frameType, lower, timeZone))
             }
 
           // Moving Frame.
@@ -255,8 +258,8 @@ case class WindowExec(
               new SlidingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower),
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, lower, timeZone),
+                createBoundOrdering(frameType, upper, timeZone))
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to