This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a87fa834224b [SPARK-51697][PYTHON][SS] Fix list state test failure in 
TransformWithStateInPandas
a87fa834224b is described below

commit a87fa834224b235f3d73dab2eb774118aa76a294
Author: bogao007 <[email protected]>
AuthorDate: Thu Apr 3 09:46:09 2025 +0900

    [SPARK-51697][PYTHON][SS] Fix list state test failure in 
TransformWithStateInPandas
    
    ### What changes were proposed in this pull request?
    
    Fix test failure in test_transform_with_state_in_pandas_list_state
    
    ### Why are the changes needed?
    
    Unblock 4.0 release
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test only change
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50501 from bogao007/4.0-list-state-fix.
    
    Authored-by: bogao007 <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/tests/pandas/test_pandas_transform_with_state.py   | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 8cefb148829c..dc104704e169 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -259,11 +259,15 @@ class TransformWithStateInPandasTestsMixin:
         }
 
     def test_transform_with_state_in_pandas_list_state(self):
-        def check_results(batch_df, _):
-            assert set(batch_df.sort("id").collect()) == {
-                Row(id="0", countAsString="2"),
-                Row(id="1", countAsString="2"),
-            }
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:
+                for q in self.spark.streams.active:
+                    q.stop()
 
         self._test_transform_with_state_in_pandas_basic(
             ListStateProcessor(), check_results, True, "processingTime"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to