This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69d433bcfd5a [SPARK-50387][SS] Update condition for timer expiry and
relevant test
69d433bcfd5a is described below
commit 69d433bcfd5a2d69f3cd7f8c4e310a3b5854fc74
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Nov 26 13:54:36 2024 +0900
[SPARK-50387][SS] Update condition for timer expiry and relevant test
### What changes were proposed in this pull request?
Update condition for timer expiry and relevant test
### Why are the changes needed?
To ensure that the expiry and removal conditions are consistent. Also, we
don't have to wait for an extra microbatch to expire timers in certain cases.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added unit tests
```
[info] Run completed in 4 seconds, 638 milliseconds.
[info] Total number of tests run: 12
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48927 from anishshri-db/task/SPARK-50387.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 8 +++++---
.../apache/spark/sql/execution/streaming/TimerStateImpl.scala | 2 +-
.../apache/spark/sql/execution/streaming/state/TimerSuite.scala | 9 +++++----
3 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 8901f09e9272..514339249818 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -553,9 +553,11 @@ class TransformWithStateInPandasTestsMixin:
Row(id="a-expired", timestamp="0"),
}
else:
- # watermark has not progressed, so timer registered in batch
1(watermark = 10)
- # has not yet expired
- assert set(batch_df.sort("id").collect()) == {Row(id="a",
timestamp="15")}
+ # verify that rows and expired timer produce the expected
result
+ assert set(batch_df.sort("id").collect()) == {
+ Row(id="a", timestamp="15"),
+ Row(id="a-expired", timestamp="10000"),
+ }
self._test_transform_with_state_in_pandas_event_time(
EventTimeStatefulProcessor(), check_results
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
index d0fbaf660060..5d20f53449c5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
@@ -178,7 +178,7 @@ class TimerStateImpl(
val rowPair = iter.next()
val keyRow = rowPair.key
val result = getTimerRowFromSecIndex(keyRow)
- if (result._2 < expiryTimestampMs) {
+ if (result._2 <= expiryTimestampMs) {
result
} else {
finished = true
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
index 24a120be9d9a..428845d5ebcb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
@@ -72,8 +72,9 @@ class TimerSuite extends StateVariableSuiteBase {
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))
assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq ===
Seq(("test_key", 1000L), ("test_key", 15000L)))
- // if timestamp equals to expiryTimestampsMs, will not considered expired
- assert(timerState1.getExpiredTimers(15000L).toSeq === Seq(("test_key",
1000L)))
+ // if timestamp equals to expiryTimestampsMs, it will be considered
expired
+ assert(timerState1.getExpiredTimers(15000L).toSeq ===
+ Seq(("test_key", 1000L), ("test_key", 15000L)))
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))
timerState1.registerTimer(20L * 1000)
@@ -128,7 +129,7 @@ class TimerSuite extends StateVariableSuiteBase {
timerTimerstamps.foreach(timerState.registerTimer)
assert(timerState.getExpiredTimers(Long.MaxValue).toSeq.map(_._2) ===
timerTimerstamps.sorted)
assert(timerState.getExpiredTimers(4200L).toSeq.map(_._2) ===
- timerTimerstamps.sorted.takeWhile(_ < 4200L))
+ timerTimerstamps.sorted.takeWhile(_ <= 4200L))
assert(timerState.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
ImplicitGroupingKeyTracker.removeImplicitKey()
}
@@ -162,7 +163,7 @@ class TimerSuite extends StateVariableSuiteBase {
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted)
assert(timerState1.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
assert(timerState1.getExpiredTimers(8000L).toSeq.map(_._2) ===
- (timerTimestamps1 ++ timerTimestamps2 ++
timerTimerStamps3).sorted.takeWhile(_ < 8000L))
+ (timerTimestamps1 ++ timerTimestamps2 ++
timerTimerStamps3).sorted.takeWhile(_ <= 8000L))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]