This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 230c2ddd170 [SPARK-39242][SS] Fix awaitOffset to wait for 
committedOffset to reach at-least expected offset for longOffset and fix 
RateStreamProvider test
230c2ddd170 is described below

commit 230c2ddd1706aae99d166ec6df2b7c3269699630
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Sun May 22 11:15:24 2022 +0900

    [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach 
at-least expected offset for longOffset and fix RateStreamProvider test
    
    ### What changes were proposed in this pull request?
    Fix awaitOffset to wait for committedOffset to reach atleast expected 
offset instead of exact value for long offsets. Also fixed the 
RateStreamProvider test to use only row values for requested range. Basically, 
for numeric increasing offsets, we could have called awaitOffset after the 
stream has moved past the expected newOffset or if committedOffsets changed 
after notify. In this case, its safe to exit, since at-least the given Offset 
has been reached and the equality condition migh [...]
    
    ### Why are the changes needed?
    Fixing bug with awaitOffset logic and RateStreamProvider test
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    With the current code, we were seeing race conditions where the test would 
hang and get terminated after a 60 min timeout. With the change, ran the 
specific test 100 times multiple times and verified that we don't see the test 
failure any more. Was also able to simulate the failure by introducing 
arbitrary sleep in the code paths and ensured that the test passes with the 
above fix. Also added small unit test for longOffset conversion validation.
    
    ```
    [info] RateStreamProviderSuite:
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for avro / Test / testOnly
    11:08:40.357 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for hive / Test / testOnly
    [info] - RateStreamProvider in registry (531 milliseconds)
    [info] - compatible with old path in registry (3 milliseconds)
    [warn] multiple main classes detected: run 'show discoveredMainClasses' to 
see the list
    11:08:43.608 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for mllib / Test / testOnly
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for sql-kafka-0-10 / Test / testOnly
    [info] - microbatch - basic (3 seconds, 966 milliseconds)
    11:08:45.807 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for repl / Test / testOnly
    11:08:48.493 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [warn] multiple main classes detected: run 'show discoveredMainClasses' to 
see the list
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for examples / Test / testOnly
    [info] - microbatch - restart (4 seconds, 365 milliseconds)
    11:08:50.278 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
    [info] No tests to run for assembly / Test / testOnly
    [info] - microbatch - uniform distribution of event timestamps (696 
milliseconds)
    [info] - microbatch - infer offsets (98 milliseconds)
    [info] - microbatch - predetermined batch size (86 milliseconds)
    [info] - microbatch - data read (85 milliseconds)
    [info] - valueAtSecond (0 milliseconds)
    [info] - overflow (265 milliseconds)
    [info] - illegal option values (4 milliseconds)
    [info] - user-specified schema given (9 milliseconds)
    [info] - continuous data (1 second, 6 milliseconds)
    11:08:55.295 WARN 
org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.sources.RateStreamProviderSuite, threads: 
rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true), 
state-store-maintenance-task (daemon=true) =====
    [info] Run completed in 17 seconds, 887 milliseconds.
    [info] Total number of tests run: 15
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 15, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    
    18:59:59) INFO: Current date is 2022-05-19
    (18:59:59) INFO: Analyzed target 
//sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2
 (0 packages loaded, 0 targets configured).
    (18:59:59) INFO: Found 1 test target...
    Target 
//sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2
 up-to-date:
      
bazel-bin/sql/core/org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2
    (19:09:58) INFO: Elapsed time: 599.874s, Critical Path: 124.19s
    (19:09:58) INFO: 101 processes: 1 internal, 100 linux-sandbox.
    
//sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2
 PASSED in 23.7s
      Stats over 100 runs: max = 23.7s, min = 17.5s, avg = 19.7s, dev = 1.5s
    
    Executed 1 out of 1 test: 1 test passes.
    ```
    
    Closes #36620 from anishshri-db/bfix/SPARK-39242.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/execution/streaming/StreamExecution.scala      | 12 +++++++++++-
 .../streaming/sources/RateStreamProviderSuite.scala          |  7 ++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5d06afbbf61..ae99743baf3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -445,7 +445,17 @@ abstract class StreamExecution(
         false
       } else {
         val source = sources(sourceIndex)
-        !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
+        // SPARK-39242 For numeric increasing offsets, we could have called 
awaitOffset
+        // after the stream has moved past the expected newOffset or if 
committedOffsets
+        // changed after notify. In this case, its safe to exit, since 
at-least the given
+        // Offset has been reached and the equality condition might never be 
met.
+        if (!localCommittedOffsets.contains(source)) {
+          true
+        } else if (newOffset.isInstanceOf[LongOffset]) {
+          localCommittedOffsets(source).toString.toLong < 
newOffset.asInstanceOf[LongOffset].offset
+        } else {
+          localCommittedOffsets(source) != newOffset
+        }
       }
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index a2657718914..cb3769ef8a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -96,8 +96,13 @@ class RateStreamProviderSuite extends StreamTest {
 
     // We have to use the lambda version of CheckAnswer because we don't know 
the right range
     // until we see the last offset.
+    // SPARK-39242 - its possible that the next output to sink has happened
+    // since the last query progress and the output rows reflect that.
+    // We just need to compare for the saved stream duration here and hence
+    // we only use those number of sorted elements from output rows.
     def expectedResultsFromDuration(rows: Seq[Row]): Unit = {
-      assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10)))
+      assert(rows.map(_.getLong(0)).sorted.take(streamDuration * 10)
+        == (0 until (streamDuration * 10)))
     }
 
     testStream(input)(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to