This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c476641812c Revert "[SPARK-45649][SQL] Unify the prepare framework for
`OffsetWindowFunctionFrame`"
c476641812c is described below
commit c476641812cc1be6aa6e4f573501cdef2dc9ef89
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Nov 21 15:54:59 2023 +0800
Revert "[SPARK-45649][SQL] Unify the prepare framework for
`OffsetWindowFunctionFrame`"
This reverts commit 3d6e31a13a4b1260a13752f3a6b9fde92fd050ce.
---
.../sql/execution/window/WindowFunctionFrame.scala | 118 ++++++++++-----------
1 file changed, 57 insertions(+), 61 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index d49e5ed5662..a849c3894f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -86,8 +86,7 @@ abstract class OffsetWindowFunctionFrameBase(
expressions: Array[OffsetWindowFunction],
inputSchema: Seq[Attribute],
newMutableProjection: (Seq[Expression], Seq[Attribute]) =>
MutableProjection,
- offset: Int,
- ignoreNulls: Boolean)
+ offset: Int)
extends WindowFunctionFrame {
/** Rows of the partition currently being processed. */
@@ -141,8 +140,6 @@ abstract class OffsetWindowFunctionFrameBase(
// is not null.
protected var skippedNonNullCount = 0
- protected val absOffset = Math.abs(offset)
-
// Reset the states by the data of the new partition.
protected def resetStates(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
input = rows
@@ -178,31 +175,6 @@ abstract class OffsetWindowFunctionFrameBase(
}
}
- override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
- if (absOffset > rows.length) {
- fillDefaultValue(EmptyRow)
- } else {
- resetStates(rows)
- if (ignoreNulls) {
- prepareForIgnoreNulls()
- } else {
- prepareForRespectNulls()
- }
- }
- }
-
- protected def prepareForIgnoreNulls(): Unit = findNextRowWithNonNullInput()
-
- protected def prepareForRespectNulls(): Unit
-
- override def write(index: Int, current: InternalRow): Unit = {
- if (input != null) {
- doWrite(index, current)
- }
- }
-
- protected def doWrite(index: Int, current: InternalRow): Unit
-
override def currentLowerBound(): Int = throw new
UnsupportedOperationException()
override def currentUpperBound(): Int = throw new
UnsupportedOperationException()
@@ -224,15 +196,24 @@ class FrameLessOffsetWindowFunctionFrame(
offset: Int,
ignoreNulls: Boolean = false)
extends OffsetWindowFunctionFrameBase(
- target, ordinal, expressions, inputSchema, newMutableProjection, offset,
ignoreNulls) {
+ target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
- override def prepareForRespectNulls(): Unit = {
- // drain the first few rows if offset is larger than zero
- while (inputIndex < offset) {
- if (inputIterator.hasNext) inputIterator.next()
- inputIndex += 1
+ override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+ resetStates(rows)
+ if (ignoreNulls) {
+ if (Math.abs(offset) > rows.length) {
+ fillDefaultValue(EmptyRow)
+ } else {
+ findNextRowWithNonNullInput()
+ }
+ } else {
+ // drain the first few rows if offset is larger than zero
+ while (inputIndex < offset) {
+ if (inputIterator.hasNext) inputIterator.next()
+ inputIndex += 1
+ }
+ inputIndex = offset
}
- inputIndex = offset
}
private val doWrite = if (ignoreNulls && offset > 0) {
@@ -279,6 +260,7 @@ class FrameLessOffsetWindowFunctionFrame(
// 7. current row -> z, next selected row -> y, output: y;
// 8. current row -> v, next selected row -> z, output: z;
// 9. current row -> null, next selected row -> v, output: v;
+ val absOffset = Math.abs(offset)
(current: InternalRow) =>
if (skippedNonNullCount == absOffset) {
nextSelectedRow = EmptyRow
@@ -312,7 +294,7 @@ class FrameLessOffsetWindowFunctionFrame(
inputIndex += 1
}
- protected def doWrite(index: Int, current: InternalRow): Unit = {
+ override def write(index: Int, current: InternalRow): Unit = {
doWrite(current)
}
}
@@ -335,30 +317,35 @@ class UnboundedOffsetWindowFunctionFrame(
offset: Int,
ignoreNulls: Boolean = false)
extends OffsetWindowFunctionFrameBase(
- target, ordinal, expressions, inputSchema, newMutableProjection, offset,
ignoreNulls) {
+ target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
assert(offset > 0)
- override def prepareForIgnoreNulls(): Unit = {
- findNextRowWithNonNullInput()
- if (nextSelectedRow == EmptyRow) {
- // Use default values since the offset row whose input value is not null
does not exist.
+ override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+ if (offset > rows.length) {
fillDefaultValue(EmptyRow)
} else {
- projection(nextSelectedRow)
- }
- }
-
- override def prepareForRespectNulls(): Unit = {
- var selectedRow: UnsafeRow = null
- // drain the first few rows if offset is larger than one
- while (inputIndex < offset) {
- selectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
- inputIndex += 1
+ resetStates(rows)
+ if (ignoreNulls) {
+ findNextRowWithNonNullInput()
+ if (nextSelectedRow == EmptyRow) {
+ // Use default values since the offset row whose input value is not
null does not exist.
+ fillDefaultValue(EmptyRow)
+ } else {
+ projection(nextSelectedRow)
+ }
+ } else {
+ var selectedRow: UnsafeRow = null
+ // drain the first few rows if offset is larger than one
+ while (inputIndex < offset) {
+ selectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+ inputIndex += 1
+ }
+ projection(selectedRow)
+ }
}
- projection(selectedRow)
}
- protected def doWrite(index: Int, current: InternalRow): Unit = {
+ override def write(index: Int, current: InternalRow): Unit = {
// The results are the same for each row in the partition, and have been
evaluated in prepare.
// Don't need to recalculate here.
}
@@ -383,18 +370,27 @@ class UnboundedPrecedingOffsetWindowFunctionFrame(
offset: Int,
ignoreNulls: Boolean = false)
extends OffsetWindowFunctionFrameBase(
- target, ordinal, expressions, inputSchema, newMutableProjection, offset,
ignoreNulls) {
+ target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
assert(offset > 0)
- override def prepareForRespectNulls(): Unit = {
- // drain the first few rows if offset is larger than one
- while (inputIndex < offset) {
- nextSelectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
- inputIndex += 1
+ override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+ if (offset > rows.length) {
+ fillDefaultValue(EmptyRow)
+ } else {
+ resetStates(rows)
+ if (ignoreNulls) {
+ findNextRowWithNonNullInput()
+ } else {
+ // drain the first few rows if offset is larger than one
+ while (inputIndex < offset) {
+ nextSelectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+ inputIndex += 1
+ }
+ }
}
}
- protected def doWrite(index: Int, current: InternalRow): Unit = {
+ override def write(index: Int, current: InternalRow): Unit = {
if (index >= inputIndex - 1 && nextSelectedRow != null) {
projection(nextSelectedRow)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]