This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9df850d  [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not 
filter input rows against watermark
9df850d is described below

commit 9df850df9e210372124e579b78e6bfb5aac6ab15
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Aug 11 20:10:59 2021 -0700

    [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input 
rows against watermark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to remove the filter applying to input rows against 
watermark in SessionWindowStateStoreSaveExec, since 
SessionWindowStateStoreSaveExec is expected to store all inputs into state 
store, and apply eviction later.
    
    ### Why are the changes needed?
    
    The code is logically not right, though I can't reproduce the actual 
problem.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests. I can't come up with broken case failing on previous code, 
but we can review the logic instead.
    
    Closes #33708 from HeartSaVioR/SPARK-36480.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
    (cherry picked from commit fac4e5eb3eae4cafe7bd6672911792612c2aaca0)
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../spark/sql/execution/streaming/statefulOperators.scala     | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 0a2d5ad..3431823 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -585,7 +585,12 @@ case class SessionWindowStateStoreRestoreExec(
 }
 
 /**
- * For each input tuple, the key is calculated and the tuple is `put` into the 
[[StateStore]].
+ * This class replaces existing sessions for the grouping key with new 
sessions in state store.
+ * All inputs are valid on storing into state store; don't filter out via 
watermark while storing.
+ * Refer the method doc of 
[[StreamingSessionWindowStateManager.updateSessions]] for more details.
+ *
+ * This class will provide the output according to the output mode.
+ * Update mode is not supported as the semantic is not feasible for session 
window.
  */
 case class SessionWindowStateStoreSaveExec(
     keyWithoutSessionExpressions: Seq[Attribute],
@@ -642,9 +647,7 @@ case class SessionWindowStateStoreSaveExec(
         // Assumption: watermark predicates must be non-empty if append mode 
is allowed
         case Some(Append) =>
           allUpdatesTimeMs += timeTakenMs {
-            val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
-              watermarkPredicateForData.get)
-            putToStore(filteredIter, store)
+            putToStore(iter, store)
           }
 
           val removalStartTimeNs = System.nanoTime

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to