[ https://issues.apache.org/jira/browse/SPARK-39347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
nyingping updated SPARK-39347: ------------------------------ Description: h2. Bug description Since the generation strategy of the sliding window in PR [#38069|https://github.com/apache/spark/pull/35362] is changed to the current one, a new problem will arise. When the record data time to be processed is negative and the absolute value of this time is greater than the length of the window, a window generation error will occur. Because the previous test cases were not fully covered, for example, in the test case [dataframetimewindowingsuite.scala#negative timestamps|#negative timestamps]: {code:java} val df1 = Seq( ("1970-01-01 00:00:02", 1), ("1970-01-01 00:00:12", 2)).toDF("time", "value") val df2 = Seq( (LocalDateTime.parse("1970-01-01T00:00:02"), 1), (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value") Seq(df1, df2).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1), Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } {code} In this test case, the original data timestamp "1970-01-01 00:00:02" is greater than 0, and the absolute value of the generated window start time "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test passes normally. But, a problem occurs when a timestamp is an age and the absolute value is greater than the window length. {code:java} val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } {code} run and get result: {code:java} == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] {code} h2. fix Since this was caused by my PR, I am extremely sorry and I will fix this bug. h2. performance benchmark result: [oldlogic#18364|https://github.com/apache/spark/pull/18364] VS 【fix version】 {code:java} Running benchmark: tumbling windows Running case: old logic Stopped after 407 iterations, 10012 ms Running case: new logic Stopped after 615 iterations, 10007 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ old logic 17 25 9 580.1 1.7 1.0X new logic 15 16 2 680.8 1.5 1.2X Running benchmark: sliding windows Running case: old logic Stopped after 10 iterations, 10296 ms Running case: new logic Stopped after 15 iterations, 10391 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ old logic 1000 1030 19 10.0 100.0 1.0X new logic 668 693 21 15.0 66.8 1.5X {code} Fixed version than PR [# 35362 | loss is about 0.1 the performance of the lost. was: h2. Bug description Since the generation strategy of the sliding window in PR [#38069|[[SPARK-38069][SQL][SS] Improve the calculation of time window by nyingping · Pull Request #35362 · apache/spark (github.com)|https://github.com/apache/spark/pull/35362]] is changed to the current one, a new problem will arise. When the record data time to be processed is negative and the absolute value of this time is greater than the length of the window, a window generation error will occur. Because the previous test cases were not fully covered, for example, in the test case [dataframetimewindowingsuite.scala#negative timestamps|#negative timestamps]: {code:java} val df1 = Seq( ("1970-01-01 00:00:02", 1), ("1970-01-01 00:00:12", 2)).toDF("time", "value") val df2 = Seq( (LocalDateTime.parse("1970-01-01T00:00:02"), 1), (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value") Seq(df1, df2).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1), Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } {code} In this test case, the original data timestamp "1970-01-01 00:00:02" is greater than 0, and the absolute value of the generated window start time "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test passes normally. But, a problem occurs when a timestamp is an age and the absolute value is greater than the window length. {code:java} val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } {code} run and get result: {code:java} == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] {code} h2. fix Since this was caused by my PR, I am extremely sorry and I will fix this bug. h2. performance benchmark result: [oldlogic#18364|https://github.com/apache/spark/pull/18364] VS 【fix version】 {code:java} Running benchmark: tumbling windows Running case: old logic Stopped after 407 iterations, 10012 ms Running case: new logic Stopped after 615 iterations, 10007 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ old logic 17 25 9 580.1 1.7 1.0X new logic 15 16 2 680.8 1.5 1.2X Running benchmark: sliding windows Running case: old logic Stopped after 10 iterations, 10296 ms Running case: new logic Stopped after 15 iterations, 10391 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ old logic 1000 1030 19 10.0 100.0 1.0X new logic 668 693 21 15.0 66.8 1.5X {code} Fixed version than PR [# 35362 | loss is about 0.1 the performance of the lost. > Generate wrong time window when time <0 && abs(time) > window.slideDuration > --------------------------------------------------------------------------- > > Key: SPARK-39347 > URL: https://issues.apache.org/jira/browse/SPARK-39347 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.3.0 > Reporter: nyingping > Priority: Major > > h2. Bug description > Since the generation strategy of the sliding window in PR > [#38069|https://github.com/apache/spark/pull/35362] is changed to the current > one, a new problem will arise. > When the record data time to be processed is negative and the absolute value > of this time is greater than the length of the window, a window generation > error will occur. > > Because the previous test cases were not fully covered, for example, in the > test case [dataframetimewindowingsuite.scala#negative timestamps|#negative > timestamps]: > > {code:java} > val df1 = Seq( > ("1970-01-01 00:00:02", 1), > ("1970-01-01 00:00:12", 2)).toDF("time", "value") > val df2 = Seq( > (LocalDateTime.parse("1970-01-01T00:00:02"), 1), > (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value") > Seq(df1, df2).foreach { df => > checkAnswer( > df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), > $"value") > .orderBy($"window.start".asc) > .select($"window.start".cast(StringType), > $"window.end".cast(StringType), $"value"), > Seq( > Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1), > Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) > ) > } {code} > > > In this test case, the original data timestamp "1970-01-01 00:00:02" is > greater than 0, and the absolute value of the generated window start time > "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test > passes normally. > > But, a problem occurs when a timestamp is an age and the absolute value is > greater than the window length. > > {code:java} > val df3 = Seq( > ("1969-12-31 00:00:02", 1), > ("1969-12-31 00:00:12", 2)).toDF("time", "value") > val df4 = Seq( > (LocalDateTime.parse("1969-12-31T00:00:02"), 1), > (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") > Seq(df3, df4).foreach { df => > checkAnswer( > df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), > $"value") > .orderBy($"window.start".asc) > .select($"window.start".cast(StringType), > $"window.end".cast(StringType), $"value"), > Seq( > Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), > Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) > ) > } {code} > run and get result: > > > {code:java} > == Results == > !== Correct Answer - 2 == == Spark Answer - 2 == > !struct<> struct<CAST(window.start AS > STRING):string,CAST(window.end AS STRING):string,value:int> > ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 > 00:00:05,1969-12-31 00:00:15,1] > ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 > 00:00:15,1969-12-31 00:00:25,2] {code} > > > h2. fix > Since this was caused by my PR, I am extremely sorry and I will fix this bug. > > h2. performance > benchmark result: > [oldlogic#18364|https://github.com/apache/spark/pull/18364] VS 【fix version】 > {code:java} > Running benchmark: tumbling windows > Running case: old logic > Stopped after 407 iterations, 10012 ms > Running case: new logic > Stopped after 615 iterations, 10007 ms > Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 > Intel64 Family 6 Model 158 Stepping 10, GenuineIntel > tumbling windows: Best Time(ms) Avg Time(ms) > Stdev(ms) Rate(M/s) Per Row(ns) Relative > ------------------------------------------------------------------------------------------------------------------------ > old logic 17 25 > 9 580.1 1.7 1.0X > new logic 15 16 > 2 680.8 1.5 1.2X > Running benchmark: sliding windows > Running case: old logic > Stopped after 10 iterations, 10296 ms > Running case: new logic > Stopped after 15 iterations, 10391 ms > Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 > Intel64 Family 6 Model 158 Stepping 10, GenuineIntel > sliding windows: Best Time(ms) Avg Time(ms) > Stdev(ms) Rate(M/s) Per Row(ns) Relative > ------------------------------------------------------------------------------------------------------------------------ > old logic 1000 1030 > 19 10.0 100.0 1.0X > new logic 668 693 > 21 15.0 66.8 1.5X > {code} > Fixed version than PR [# 35362 | loss is about 0.1 the performance of the > lost. > -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org