This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 9df850d [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not
filter input rows against watermark
9df850d is described below
commit 9df850df9e210372124e579b78e6bfb5aac6ab15
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Aug 11 20:10:59 2021 -0700
[SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input
rows against watermark
### What changes were proposed in this pull request?
This PR proposes to remove the filter applying to input rows against
watermark in SessionWindowStateStoreSaveExec, since
SessionWindowStateStoreSaveExec is expected to store all inputs into state
store, and apply eviction later.
### Why are the changes needed?
The code is logically not right, though I can't reproduce the actual
problem.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests. I can't come up with broken case failing on previous code,
but we can review the logic instead.
Closes #33708 from HeartSaVioR/SPARK-36480.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit fac4e5eb3eae4cafe7bd6672911792612c2aaca0)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../spark/sql/execution/streaming/statefulOperators.scala | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 0a2d5ad..3431823 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -585,7 +585,12 @@ case class SessionWindowStateStoreRestoreExec(
}
/**
- * For each input tuple, the key is calculated and the tuple is `put` into the
[[StateStore]].
+ * This class replaces existing sessions for the grouping key with new
sessions in state store.
+ * All inputs are valid on storing into state store; don't filter out via
watermark while storing.
+ * Refer the method doc of
[[StreamingSessionWindowStateManager.updateSessions]] for more details.
+ *
+ * This class will provide the output according to the output mode.
+ * Update mode is not supported as the semantic is not feasible for session
window.
*/
case class SessionWindowStateStoreSaveExec(
keyWithoutSessionExpressions: Seq[Attribute],
@@ -642,9 +647,7 @@ case class SessionWindowStateStoreSaveExec(
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
allUpdatesTimeMs += timeTakenMs {
- val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
- watermarkPredicateForData.get)
- putToStore(filteredIter, store)
+ putToStore(iter, store)
}
val removalStartTimeNs = System.nanoTime
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]