This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6099edc  [SPARK-36764][SS][TEST] Fix race-condition on "ensure 
continuous stream is being used" in KafkaContinuousTest
6099edc is described below

commit 6099edc66eb35db548230eeaba791c730eb38f84
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Sep 17 21:28:02 2021 +0800

    [SPARK-36764][SS][TEST] Fix race-condition on "ensure continuous stream is 
being used" in KafkaContinuousTest
    
    ### What changes were proposed in this pull request?
    
    The test “ensure continuous stream is being used“ in KafkaContinuousTest 
quickly checks the actual type of the execution, and stops the query. Stopping 
the streaming query in continuous mode is done by interrupting query execution 
thread and join with indefinite timeout.
    
    In parallel, started streaming query is going to generate execution plan, 
including running optimizer. Some parts of SessionState can be built at that 
time, as they are defined as lazy. The problem is, some of them seem to 
“swallow” the InterruptedException and let the thread run continuously.
    
    That said, the query can’t indicate whether there is a request on stopping 
query, so the query won’t stop.
    
    This PR fixes such scenario via ensuring that streaming query has started 
before the test stops the query.
    
    ### Why are the changes needed?
    
    Race-condition could end up with test hang till test framework marks it as 
timed-out.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #34004 from HeartSaVioR/SPARK-36764.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 9ee8cbf..4b6a5b8 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -91,6 +91,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
       .load()
 
     testStream(query)(
+      makeSureGetOffsetCalled,
       Execute(q => assert(q.isInstanceOf[ContinuousExecution]))
     )
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to