[
https://issues.apache.org/jira/browse/SPARK-39347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
nyingping updated SPARK-39347:
------------------------------
Description:
Since the generation strategy of the sliding window in PR
[#35362]([https://github.com/apache/spark/pull/35362]) is changed to the
current one, and that leads to a new problem.
A window generation error occurs when the time required to process the recorded
data is negative and the modulo value between the time and window length is
less than 0. In the current test cases, this bug does not thorw up.
[ test("negative
timestamps")]([https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299])
{code:java}
val df1 = Seq(
("1970-01-01 00:00:02", 1),
("1970-01-01 00:00:12", 2)).toDF("time", "value")
val df2 = Seq(
(LocalDateTime.parse("1970-01-01T00:00:02"), 1),
(LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
Seq(df1, df2).foreach { df =>
checkAnswer(
df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
$"value")
.orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType),
$"value"),
Seq(
Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
)
} {code}
The timestamp of the above test data is not negative, and the value modulo the
window length is not negative, so it can be passes the test case.
An exception occurs when the timestamp becomes something like this.
{code:java}
val df3 = Seq(
("1969-12-31 00:00:02", 1),
("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
(LocalDateTime.parse("1969-12-31T00:00:02"), 1),
(LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
Seq(df3, df4).foreach { df =>
checkAnswer(
df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
$"value")
.orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType),
$"value"),
Seq(
Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
)
} {code}
run and get unexpected result:
{code:java}
== Results ==
!== Correct Answer - 2 == == Spark Answer - 2 ==
!struct<> struct<CAST(window.start AS
STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31
00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31
00:00:25,2] {code}
*benchmark result*
oldlogic[#18364]([https://github.com/apache/spark/pull/18364]) VS 【fix version】
{code:java}
Running benchmark: tumbling windows
Running case: old logic
Stopped after 407 iterations, 10012 ms
Running case: new logic
Stopped after 615 iterations, 10007 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
tumbling windows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
old logic 17 25
9 580.1 1.7 1.0X
new logic 15 16
2 680.8 1.5 1.2X
Running benchmark: sliding windows
Running case: old logic
Stopped after 10 iterations, 10296 ms
Running case: new logic
Stopped after 15 iterations, 10391 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
sliding windows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
old logic 1000 1030
19 10.0 100.0 1.0X
new logic 668 693
21 15.0 66.8 1.5X
{code}
Fixed version than PR [#38069]([https://github.com/apache/spark/pull/35362])
lost a bit of the performance.
was:
Since the generation strategy of the sliding window in PR
[#35362]([https://github.com/apache/spark/pull/35362]) is changed to the
current one, and that leads to a new problem.
A window generation error occurs when the time required to process the recorded
data is negative and the modulo value between the time and window length is
less than 0. In the current test cases, this bug does not thorw up.
[ test("negative
timestamps")]([https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299])
{code:java}
val df1 = Seq(
("1970-01-01 00:00:02", 1),
("1970-01-01 00:00:12", 2)).toDF("time", "value")
val df2 = Seq(
(LocalDateTime.parse("1970-01-01T00:00:02"), 1),
(LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
Seq(df1, df2).foreach
{ df => checkAnswer( df.select(window($"time", "10 seconds", "10
seconds", "5 seconds"), $"value") .orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType),
$"value"), Seq( Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) }
{code}
The timestamp of the above test data is not negative, and the value modulo the
window length is not negative, so it can be passes the test case.
An exception occurs when the timestamp becomes something like this.
{code:java}
val df3 = Seq(
("1969-12-31 00:00:02", 1),
("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
(LocalDateTime.parse("1969-12-31T00:00:02"), 1),
(LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
Seq(df3, df4).foreach
{ df => checkAnswer( df.select(window($"time", "10 seconds", "10
seconds", "5 seconds"), $"value") .orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType),
$"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31
00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
) }
{code}
run and get unexpected result:
{code:java}
== Results ==
!== Correct Answer - 2 == == Spark Answer - 2 ==
!struct<> struct<CAST(window.start AS
STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31
00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31
00:00:25,2] {code}
*benchmark result*
oldlogic[#18364]([https://github.com/apache/spark/pull/18364]) VS 【fix version】
{code:java}
Running benchmark: tumbling windows
Running case: old logic
Stopped after 407 iterations, 10012 ms
Running case: new logic
Stopped after 615 iterations, 10007 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
tumbling windows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
old logic 17 25
9 580.1 1.7 1.0X
new logic 15 16
2 680.8 1.5 1.2X
Running benchmark: sliding windows
Running case: old logic
Stopped after 10 iterations, 10296 ms
Running case: new logic
Stopped after 15 iterations, 10391 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
sliding windows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
old logic 1000 1030
19 10.0 100.0 1.0X
new logic 668 693
21 15.0 66.8 1.5X
{code}
Fixed version than PR [#38069]([https://github.com/apache/spark/pull/35362])
lost a bit of the performance.
> Generate wrong time window when (timestamp-startTime) % slideDuration < 0
> -------------------------------------------------------------------------
>
> Key: SPARK-39347
> URL: https://issues.apache.org/jira/browse/SPARK-39347
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.3.0
> Reporter: nyingping
> Priority: Major
>
> Since the generation strategy of the sliding window in PR
> [#35362]([https://github.com/apache/spark/pull/35362]) is changed to the
> current one, and that leads to a new problem.
> A window generation error occurs when the time required to process the
> recorded data is negative and the modulo value between the time and window
> length is less than 0. In the current test cases, this bug does not thorw up.
> [ test("negative
> timestamps")]([https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299])
>
> {code:java}
> val df1 = Seq(
> ("1970-01-01 00:00:02", 1),
> ("1970-01-01 00:00:12", 2)).toDF("time", "value")
> val df2 = Seq(
> (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
> (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
> Seq(df1, df2).foreach { df =>
> checkAnswer(
> df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
> $"value")
> .orderBy($"window.start".asc)
> .select($"window.start".cast(StringType),
> $"window.end".cast(StringType), $"value"),
> Seq(
> Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
> Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
> )
> } {code}
>
>
> The timestamp of the above test data is not negative, and the value modulo
> the window length is not negative, so it can be passes the test case.
> An exception occurs when the timestamp becomes something like this.
>
> {code:java}
> val df3 = Seq(
> ("1969-12-31 00:00:02", 1),
> ("1969-12-31 00:00:12", 2)).toDF("time", "value")
> val df4 = Seq(
> (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
> (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
> Seq(df3, df4).foreach { df =>
> checkAnswer(
> df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"),
> $"value")
> .orderBy($"window.start".asc)
> .select($"window.start".cast(StringType),
> $"window.end".cast(StringType), $"value"),
> Seq(
> Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
> Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
> )
> } {code}
>
> run and get unexpected result:
>
> {code:java}
> == Results ==
> !== Correct Answer - 2 == == Spark Answer - 2 ==
> !struct<> struct<CAST(window.start AS
> STRING):string,CAST(window.end AS STRING):string,value:int>
> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31
> 00:00:05,1969-12-31 00:00:15,1]
> ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31
> 00:00:15,1969-12-31 00:00:25,2] {code}
>
> *benchmark result*
>
> oldlogic[#18364]([https://github.com/apache/spark/pull/18364]) VS 【fix
> version】
> {code:java}
> Running benchmark: tumbling windows
> Running case: old logic
> Stopped after 407 iterations, 10012 ms
> Running case: new logic
> Stopped after 615 iterations, 10007 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> tumbling windows: Best Time(ms) Avg Time(ms)
> Stdev(ms) Rate(M/s) Per Row(ns) Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic 17 25
> 9 580.1 1.7 1.0X
> new logic 15 16
> 2 680.8 1.5 1.2X
> Running benchmark: sliding windows
> Running case: old logic
> Stopped after 10 iterations, 10296 ms
> Running case: new logic
> Stopped after 15 iterations, 10391 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> sliding windows: Best Time(ms) Avg Time(ms)
> Stdev(ms) Rate(M/s) Per Row(ns) Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic 1000 1030
> 19 10.0 100.0 1.0X
> new logic 668 693
> 21 15.0 66.8 1.5X
> {code}
>
>
> Fixed version than PR [#38069]([https://github.com/apache/spark/pull/35362])
> lost a bit of the performance.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]