This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 230c2ddd170 [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvider test 230c2ddd170 is described below commit 230c2ddd1706aae99d166ec6df2b7c3269699630 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Sun May 22 11:15:24 2022 +0900 [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvider test ### What changes were proposed in this pull request? Fix awaitOffset to wait for committedOffset to reach atleast expected offset instead of exact value for long offsets. Also fixed the RateStreamProvider test to use only row values for requested range. Basically, for numeric increasing offsets, we could have called awaitOffset after the stream has moved past the expected newOffset or if committedOffsets changed after notify. In this case, its safe to exit, since at-least the given Offset has been reached and the equality condition migh [...] ### Why are the changes needed? Fixing bug with awaitOffset logic and RateStreamProvider test ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? With the current code, we were seeing race conditions where the test would hang and get terminated after a 60 min timeout. With the change, ran the specific test 100 times multiple times and verified that we don't see the test failure any more. Was also able to simulate the failure by introducing arbitrary sleep in the code paths and ensured that the test passes with the above fix. Also added small unit test for longOffset conversion validation. ``` [info] RateStreamProviderSuite: [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for avro / Test / testOnly 11:08:40.357 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for hive / Test / testOnly [info] - RateStreamProvider in registry (531 milliseconds) [info] - compatible with old path in registry (3 milliseconds) [warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list 11:08:43.608 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for mllib / Test / testOnly [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for sql-kafka-0-10 / Test / testOnly [info] - microbatch - basic (3 seconds, 966 milliseconds) 11:08:45.807 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for repl / Test / testOnly 11:08:48.493 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for examples / Test / testOnly [info] - microbatch - restart (4 seconds, 365 milliseconds) 11:08:50.278 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for assembly / Test / testOnly [info] - microbatch - uniform distribution of event timestamps (696 milliseconds) [info] - microbatch - infer offsets (98 milliseconds) [info] - microbatch - predetermined batch size (86 milliseconds) [info] - microbatch - data read (85 milliseconds) [info] - valueAtSecond (0 milliseconds) [info] - overflow (265 milliseconds) [info] - illegal option values (4 milliseconds) [info] - user-specified schema given (9 milliseconds) [info] - continuous data (1 second, 6 milliseconds) 11:08:55.295 WARN org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.sources.RateStreamProviderSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true), state-store-maintenance-task (daemon=true) ===== [info] Run completed in 17 seconds, 887 milliseconds. [info] Total number of tests run: 15 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 15, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. 18:59:59) INFO: Current date is 2022-05-19 (18:59:59) INFO: Analyzed target //sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 (0 packages loaded, 0 targets configured). (18:59:59) INFO: Found 1 test target... Target //sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 up-to-date: bazel-bin/sql/core/org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 (19:09:58) INFO: Elapsed time: 599.874s, Critical Path: 124.19s (19:09:58) INFO: 101 processes: 1 internal, 100 linux-sandbox. //sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 PASSED in 23.7s Stats over 100 runs: max = 23.7s, min = 17.5s, avg = 19.7s, dev = 1.5s Executed 1 out of 1 test: 1 test passes. ``` Closes #36620 from anishshri-db/bfix/SPARK-39242. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/StreamExecution.scala | 12 +++++++++++- .../streaming/sources/RateStreamProviderSuite.scala | 7 ++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5d06afbbf61..ae99743baf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -445,7 +445,17 @@ abstract class StreamExecution( false } else { val source = sources(sourceIndex) - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + // SPARK-39242 For numeric increasing offsets, we could have called awaitOffset + // after the stream has moved past the expected newOffset or if committedOffsets + // changed after notify. In this case, its safe to exit, since at-least the given + // Offset has been reached and the equality condition might never be met. + if (!localCommittedOffsets.contains(source)) { + true + } else if (newOffset.isInstanceOf[LongOffset]) { + localCommittedOffsets(source).toString.toLong < newOffset.asInstanceOf[LongOffset].offset + } else { + localCommittedOffsets(source) != newOffset + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index a2657718914..cb3769ef8a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -96,8 +96,13 @@ class RateStreamProviderSuite extends StreamTest { // We have to use the lambda version of CheckAnswer because we don't know the right range // until we see the last offset. + // SPARK-39242 - its possible that the next output to sink has happened + // since the last query progress and the output rows reflect that. + // We just need to compare for the saved stream duration here and hence + // we only use those number of sorted elements from output rows. def expectedResultsFromDuration(rows: Seq[Row]): Unit = { - assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + assert(rows.map(_.getLong(0)).sorted.take(streamDuration * 10) + == (0 until (streamDuration * 10))) } testStream(input)( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org