[
https://issues.apache.org/jira/browse/SPARK-39347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
nyingping updated SPARK-39347:
------------------------------
Summary: Generate wrong time window when (timestamp-startTime) %
slideDuration < 0 (was: Generate wrong time window when (timestamp-startTim) %
slideDuration < 0)
> Generate wrong time window when (timestamp-startTime) % slideDuration < 0
> -------------------------------------------------------------------------
>
> Key: SPARK-39347
> URL: https://issues.apache.org/jira/browse/SPARK-39347
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.3.0
> Reporter: nyingping
> Priority: Major
>
> h2. Bug description
> Since the generation strategy of the sliding window in PR
> [#38069|https://github.com/apache/spark/pull/35362] is changed to the current
> one, a new problem will arise.
> When the record data time to be processed is negative and the absolute value
> of this time is greater than the length of the window, a window generation
> error will occur.
>
> Because the previous test cases were not fully covered, for example, in the
> test case [dataframetimewindowingsuite.scala#negative timestamps|#negative
> timestamps]:
>
> {code:java}
> val df1 = Seq(
> ("1970-01-01 00:00:02", 1),
> ("1970-01-01 00:00:12", 2)).toDF("time", "value")
> val df2 = Seq(
> (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
> (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
> Seq(df1, df2).foreach { df =>
> checkAnswer(
> df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
> $"value")
> .orderBy($"window.start".asc)
> .select($"window.start".cast(StringType),
> $"window.end".cast(StringType), $"value"),
> Seq(
> Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
> Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
> )
> } {code}
>
>
> In this test case, the original data timestamp "1970-01-01 00:00:02" is
> greater than 0, and the absolute value of the generated window start time
> "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test
> passes normally.
>
> But, a problem occurs when a timestamp is an age and the absolute value is
> greater than the window length.
>
> {code:java}
> val df3 = Seq(
> ("1969-12-31 00:00:02", 1),
> ("1969-12-31 00:00:12", 2)).toDF("time", "value")
> val df4 = Seq(
> (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
> (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
> Seq(df3, df4).foreach { df =>
> checkAnswer(
> df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
> $"value")
> .orderBy($"window.start".asc)
> .select($"window.start".cast(StringType),
> $"window.end".cast(StringType), $"value"),
> Seq(
> Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
> Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
> )
> } {code}
> run and get result:
>
>
> {code:java}
> == Results ==
> !== Correct Answer - 2 == == Spark Answer - 2 ==
> !struct<> struct<CAST(window.start AS
> STRING):string,CAST(window.end AS STRING):string,value:int>
> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31
> 00:00:05,1969-12-31 00:00:15,1]
> ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31
> 00:00:15,1969-12-31 00:00:25,2] {code}
>
>
> h2. fix
> Since this was caused by my PR, I am extremely sorry and I will fix this bug.
>
> h2. performance
> benchmark result:
> [oldlogic#18364|https://github.com/apache/spark/pull/18364] VS 【fix version】
> {code:java}
> Running benchmark: tumbling windows
> Running case: old logic
> Stopped after 407 iterations, 10012 ms
> Running case: new logic
> Stopped after 615 iterations, 10007 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> tumbling windows: Best Time(ms) Avg Time(ms)
> Stdev(ms) Rate(M/s) Per Row(ns) Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic 17 25
> 9 580.1 1.7 1.0X
> new logic 15 16
> 2 680.8 1.5 1.2X
> Running benchmark: sliding windows
> Running case: old logic
> Stopped after 10 iterations, 10296 ms
> Running case: new logic
> Stopped after 15 iterations, 10391 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> sliding windows: Best Time(ms) Avg Time(ms)
> Stdev(ms) Rate(M/s) Per Row(ns) Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic 1000 1030
> 19 10.0 100.0 1.0X
> new logic 668 693
> 21 15.0 66.8 1.5X
> {code}
> Fixed version than PR [# 35362 | lost a little part of the performance.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]