This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50dd494a7b1a [SPARK-55018][PYTHON][TEST] Fix test_convergence by not 
overflowing the list
50dd494a7b1a is described below

commit 50dd494a7b1af099d1d4804eb473705d2337c746
Author: Tian Gao <[email protected]>
AuthorDate: Tue Jan 13 17:17:31 2026 +0800

    [SPARK-55018][PYTHON][TEST] Fix test_convergence by not overflowing the list
    
    ### What changes were proposed in this pull request?
    
    Fixed test_convergence by only filling the list if a batch is not empty.
    
    ### Why are the changes needed?
    
    The check below checks that `len(models) == len(input_batches)`, but 
`input_stream.foreachRDD` will keep sending batches even when it runs through 
`input_batches`. Which means for every second, we are filling the list with a 
new element, no matter if we run through the input.
    
    If our condition check missed the exact second where the length of models 
happen to be 20, the length will become 21 and the condition will never be true.
    
    Actually this would make the check below wrong too - the last two models 
will be exactly the same so diff check is meaningless.
    
    We should not update `models` if we did not get a real batch.
    
    Increasing interval in `eventually` made this error more frequent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Locally I set the check interval super high and it still passed
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53779 from gaogaotiantian/fix-convergence.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/mllib/tests/test_streaming_algorithms.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/mllib/tests/test_streaming_algorithms.py 
b/python/pyspark/mllib/tests/test_streaming_algorithms.py
index 14724d6c46e2..b5567f0ce682 100644
--- a/python/pyspark/mllib/tests/test_streaming_algorithms.py
+++ b/python/pyspark/mllib/tests/test_streaming_algorithms.py
@@ -238,7 +238,12 @@ class 
StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         slr = StreamingLogisticRegressionWithSGD(stepSize=0.2, 
numIterations=25)
         slr.setInitialWeights([0.0])
         slr.trainOn(input_stream)
-        input_stream.foreachRDD(lambda x: 
models.append(slr.latestModel().weights[0]))
+
+        def update_models(x):
+            if not x.isEmpty():
+                models.append(slr.latestModel().weights[0])
+
+        input_stream.foreachRDD(update_models)
 
         self.ssc.start()
 
@@ -247,7 +252,7 @@ class 
StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
             return True
 
         # We want all batches to finish for this test.
-        eventually(timeout=120, catch_assertions=True)(condition)()
+        eventually(timeout=120, catch_assertions=True, 
interval=12.0)(condition)()
 
         t_models = array(models)
         diff = t_models[1:] - t_models[:-1]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to