This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c476641812c Revert "[SPARK-45649][SQL] Unify the prepare framework for 
`OffsetWindowFunctionFrame`"
c476641812c is described below

commit c476641812cc1be6aa6e4f573501cdef2dc9ef89
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Nov 21 15:54:59 2023 +0800

    Revert "[SPARK-45649][SQL] Unify the prepare framework for 
`OffsetWindowFunctionFrame`"
    
    This reverts commit 3d6e31a13a4b1260a13752f3a6b9fde92fd050ce.
---
 .../sql/execution/window/WindowFunctionFrame.scala | 118 ++++++++++-----------
 1 file changed, 57 insertions(+), 61 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index d49e5ed5662..a849c3894f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -86,8 +86,7 @@ abstract class OffsetWindowFunctionFrameBase(
     expressions: Array[OffsetWindowFunction],
     inputSchema: Seq[Attribute],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-    offset: Int,
-    ignoreNulls: Boolean)
+    offset: Int)
   extends WindowFunctionFrame {
 
   /** Rows of the partition currently being processed. */
@@ -141,8 +140,6 @@ abstract class OffsetWindowFunctionFrameBase(
   // is not null.
   protected var skippedNonNullCount = 0
 
-  protected val absOffset = Math.abs(offset)
-
   // Reset the states by the data of the new partition.
   protected def resetStates(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
     input = rows
@@ -178,31 +175,6 @@ abstract class OffsetWindowFunctionFrameBase(
     }
   }
 
-  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
-    if (absOffset > rows.length) {
-      fillDefaultValue(EmptyRow)
-    } else {
-      resetStates(rows)
-      if (ignoreNulls) {
-        prepareForIgnoreNulls()
-      } else {
-        prepareForRespectNulls()
-      }
-    }
-  }
-
-  protected def prepareForIgnoreNulls(): Unit = findNextRowWithNonNullInput()
-
-  protected def prepareForRespectNulls(): Unit
-
-  override def write(index: Int, current: InternalRow): Unit = {
-    if (input != null) {
-      doWrite(index, current)
-    }
-  }
-
-  protected def doWrite(index: Int, current: InternalRow): Unit
-
   override def currentLowerBound(): Int = throw new 
UnsupportedOperationException()
 
   override def currentUpperBound(): Int = throw new 
UnsupportedOperationException()
@@ -224,15 +196,24 @@ class FrameLessOffsetWindowFunctionFrame(
     offset: Int,
     ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
-    target, ordinal, expressions, inputSchema, newMutableProjection, offset, 
ignoreNulls) {
+    target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
-  override def prepareForRespectNulls(): Unit = {
-    // drain the first few rows if offset is larger than zero
-    while (inputIndex < offset) {
-      if (inputIterator.hasNext) inputIterator.next()
-      inputIndex += 1
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+    resetStates(rows)
+    if (ignoreNulls) {
+      if (Math.abs(offset) > rows.length) {
+        fillDefaultValue(EmptyRow)
+      } else {
+        findNextRowWithNonNullInput()
+      }
+    } else {
+      // drain the first few rows if offset is larger than zero
+      while (inputIndex < offset) {
+        if (inputIterator.hasNext) inputIterator.next()
+        inputIndex += 1
+      }
+      inputIndex = offset
     }
-    inputIndex = offset
   }
 
   private val doWrite = if (ignoreNulls && offset > 0) {
@@ -279,6 +260,7 @@ class FrameLessOffsetWindowFunctionFrame(
     // 7. current row -> z, next selected row -> y, output: y;
     // 8. current row -> v, next selected row -> z, output: z;
     // 9. current row -> null, next selected row -> v, output: v;
+    val absOffset = Math.abs(offset)
     (current: InternalRow) =>
       if (skippedNonNullCount == absOffset) {
         nextSelectedRow = EmptyRow
@@ -312,7 +294,7 @@ class FrameLessOffsetWindowFunctionFrame(
       inputIndex += 1
   }
 
-  protected def doWrite(index: Int, current: InternalRow): Unit = {
+  override def write(index: Int, current: InternalRow): Unit = {
     doWrite(current)
   }
 }
@@ -335,30 +317,35 @@ class UnboundedOffsetWindowFunctionFrame(
     offset: Int,
     ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
-    target, ordinal, expressions, inputSchema, newMutableProjection, offset, 
ignoreNulls) {
+    target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
   assert(offset > 0)
 
-  override def prepareForIgnoreNulls(): Unit = {
-    findNextRowWithNonNullInput()
-    if (nextSelectedRow == EmptyRow) {
-      // Use default values since the offset row whose input value is not null 
does not exist.
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+    if (offset > rows.length) {
       fillDefaultValue(EmptyRow)
     } else {
-      projection(nextSelectedRow)
-    }
-  }
-
-  override def prepareForRespectNulls(): Unit = {
-    var selectedRow: UnsafeRow = null
-    // drain the first few rows if offset is larger than one
-    while (inputIndex < offset) {
-      selectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
-      inputIndex += 1
+      resetStates(rows)
+      if (ignoreNulls) {
+        findNextRowWithNonNullInput()
+        if (nextSelectedRow == EmptyRow) {
+          // Use default values since the offset row whose input value is not 
null does not exist.
+          fillDefaultValue(EmptyRow)
+        } else {
+          projection(nextSelectedRow)
+        }
+      } else {
+        var selectedRow: UnsafeRow = null
+        // drain the first few rows if offset is larger than one
+        while (inputIndex < offset) {
+          selectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+          inputIndex += 1
+        }
+        projection(selectedRow)
+      }
     }
-    projection(selectedRow)
   }
 
-  protected def doWrite(index: Int, current: InternalRow): Unit = {
+  override def write(index: Int, current: InternalRow): Unit = {
     // The results are the same for each row in the partition, and have been 
evaluated in prepare.
     // Don't need to recalculate here.
   }
@@ -383,18 +370,27 @@ class UnboundedPrecedingOffsetWindowFunctionFrame(
     offset: Int,
     ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
-    target, ordinal, expressions, inputSchema, newMutableProjection, offset, 
ignoreNulls) {
+    target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
   assert(offset > 0)
 
-  override def prepareForRespectNulls(): Unit = {
-    // drain the first few rows if offset is larger than one
-    while (inputIndex < offset) {
-      nextSelectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
-      inputIndex += 1
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+    if (offset > rows.length) {
+      fillDefaultValue(EmptyRow)
+    } else {
+      resetStates(rows)
+      if (ignoreNulls) {
+        findNextRowWithNonNullInput()
+      } else {
+        // drain the first few rows if offset is larger than one
+        while (inputIndex < offset) {
+          nextSelectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+          inputIndex += 1
+        }
+      }
     }
   }
 
-  protected def doWrite(index: Int, current: InternalRow): Unit = {
+  override def write(index: Int, current: InternalRow): Unit = {
     if (index >= inputIndex - 1 && nextSelectedRow != null) {
       projection(nextSelectedRow)
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to