This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d314e4bba54a [SPARK-46250][CONNECT][SS] Deflake test_parity_listener
d314e4bba54a is described below

commit d314e4bba54a08babad8f8ecd564e7440c0e97f4
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Tue Dec 5 08:48:23 2023 +0900

    [SPARK-46250][CONNECT][SS] Deflake test_parity_listener
    
    ### What changes were proposed in this pull request?
    
    We didn’t give the onQueryTerminated handler enough time to write the data 
to that table, and hence it's possible that we read from a not existing table 
`listener_terminated_event`.
    
    ### Why are the changes needed?
    
    Deflake existing tests
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test only change
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #44166 from WweiL/SPARK-46250-deflaky-parity-listener.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/connect/streaming/test_parity_listener.py | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py 
b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
index ca02cf29ee7d..4fc040642bed 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
@@ -68,11 +68,14 @@ class 
StreamingListenerParityTests(StreamingListenerTestsMixin, ReusedConnectTes
             )
 
             self.assertTrue(q.isActive)
-            time.sleep(10)
-            self.assertTrue(q.lastProgress["batchId"] > 0)  # ensure at least 
one batch is ran
+            # ensure at least one batch is ran
+            while q.lastProgress is None or q.lastProgress["batchId"] == 0:
+                time.sleep(5)
             q.stop()
             self.assertFalse(q.isActive)
 
+            time.sleep(60)  # Sleep to make sure listener_terminated_events is 
written successfully
+
             start_event = pyspark.cloudpickle.loads(
                 self.spark.read.table("listener_start_events").collect()[0][0]
             )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to