This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9dc3c1ebe8e [SPARK-55335][PYTHON][TESTS] Use eventually instead of 
hard-coded wait for datasource test
d9dc3c1ebe8e is described below

commit d9dc3c1ebe8e05818a0653fa5e9b60b694bf5110
Author: Tian Gao <[email protected]>
AuthorDate: Thu Feb 5 11:03:15 2026 +0800

    [SPARK-55335][PYTHON][TESTS] Use eventually instead of hard-coded wait for 
datasource test
    
    ### What changes were proposed in this pull request?
    
    Instead of a hard-coded `time.sleep(6)`, do a `eventually` until we have 
some rows committed.
    
    ### Why are the changes needed?
    
    coverage run is slower than normal runs, which could result in no committed 
row for this test. Using eventually can guarantee some rows to be there while 
not waiting too long.
    
    https://github.com/apache/spark/actions/runs/21586206793/job/62195041694
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally passed.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54110 from gaogaotiantian/fix-sleep-test.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/tests/test_python_streaming_datasource.py  | 55 ++++++++++++----------
 1 file changed, 30 insertions(+), 25 deletions(-)

diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index a5ae69de1d96..deec97d0da72 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -35,6 +35,7 @@ from pyspark.testing.sqlutils import (
     pyarrow_requirement_message,
 )
 from pyspark.testing import assertDataFrameEqual
+from pyspark.testing.utils import eventually
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
 
@@ -399,41 +400,45 @@ class BasePythonStreamingDataSourceTestsMixin:
                 .start()
             )
 
-            # Wait a bit for data to be processed, then stop
-            time.sleep(6)  # Allow a few batches to run
-            query.stop()
-            query.awaitTermination()
-
-            # Since we're writing actual JSON files, verify commit metadata 
and written files
-            commit_files = [f for f in os.listdir(temp_dir) if 
f.startswith("commit_")]
-            self.assertTrue(len(commit_files) > 0, "No commit files were 
created")
+            @eventually(
+                timeout=20,
+                interval=2.0,
+                catch_assertions=True,
+                expected_exceptions=(json.JSONDecodeError,),
+            )
+            def check():
+                # Since we're writing actual JSON files, verify commit 
metadata and written files
+                commit_files = [f for f in os.listdir(temp_dir) if 
f.startswith("commit_")]
+                self.assertTrue(len(commit_files) > 0, "No commit files were 
created")
+
+                # Read and verify commit metadata - check all commits for any 
with data
+                total_committed_rows = 0
+                total_committed_batches = 0
+
+                for commit_file in commit_files:
+                    with open(os.path.join(temp_dir, commit_file), "r") as f:
+                        commit_data = json.load(f)
+                        total_committed_rows += commit_data.get("total_rows", 
0)
+                        total_committed_batches += 
commit_data.get("total_batches", 0)
+
+                self.assertTrue(
+                    total_committed_rows > 0,
+                    f"Expected committed data but got {total_committed_rows} 
rows",
+                )
 
-            # Read and verify commit metadata - check all commits for any with 
data
-            total_committed_rows = 0
-            total_committed_batches = 0
+            check()
 
-            for commit_file in commit_files:
-                with open(os.path.join(temp_dir, commit_file), "r") as f:
-                    commit_data = json.load(f)
-                    total_committed_rows += commit_data.get("total_rows", 0)
-                    total_committed_batches += 
commit_data.get("total_batches", 0)
+            query.stop()
+            query.awaitTermination()
 
-            # We should have both committed data AND JSON files written
             json_files = [
                 f
                 for f in os.listdir(temp_dir)
                 if f.startswith("partition_") and f.endswith(".json")
             ]
 
-            # Verify that we have both committed data AND JSON files
-            has_committed_data = total_committed_rows > 0
-            has_json_files = len(json_files) > 0
-
-            self.assertTrue(
-                has_committed_data, f"Expected committed data but got 
{total_committed_rows} rows"
-            )
             self.assertTrue(
-                has_json_files, f"Expected JSON files but found 
{len(json_files)} files"
+                len(json_files) > 0, f"Expected JSON files but found 
{len(json_files)} files"
             )
 
             # Verify JSON files contain valid data


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to