This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eb966812b32 [SPARK-51889][PYTHON][SS] Fix a bug for MapState clear() 
in Python TWS
6eb966812b32 is described below

commit 6eb966812b328a29ccc6ac1531d0a4258e6c90c7
Author: bogao007 <bo....@databricks.com>
AuthorDate: Thu Apr 24 14:43:43 2025 +0900

    [SPARK-51889][PYTHON][SS] Fix a bug for MapState clear() in Python TWS
    
    ### What changes were proposed in this pull request?
    
    Fix a bug for MapState clear() in Python TWS
    
    ### Why are the changes needed?
    
    Without this change, MapState clear() would not work in Python.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50686 from bogao007/mapstate-clear.
    
    Authored-by: bogao007 <bo....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/tests/pandas/helper/helper_pandas_transform_with_state.py   | 6 ++++++
 .../python/streaming/TransformWithStateInPySparkStateServer.scala   | 3 +++
 2 files changed, 9 insertions(+)

diff --git 
a/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py
index c376f0fb6836..53f6d77567ae 100644
--- 
a/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py
+++ 
b/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py
@@ -972,6 +972,9 @@ class PandasMapStateProcessor(StatefulProcessor):
         assert next(map_iter)[1] == (value2,)
         self.map_state.removeKey(key1)
         assert not self.map_state.containsKey(key1)
+        assert self.map_state.exists()
+        self.map_state.clear()
+        assert not self.map_state.exists()
         yield pd.DataFrame({"id": key, "countAsString": str(count)})
 
     def close(self) -> None:
@@ -1010,6 +1013,9 @@ class RowMapStateProcessor(StatefulProcessor):
         assert next(map_iter)[1] == (value2,)
         self.map_state.removeKey(key1)
         assert not self.map_state.containsKey(key1)
+        assert self.map_state.exists()
+        self.map_state.clear()
+        assert not self.map_state.exists()
         yield Row(id=key[0], countAsString=str(count))
 
     def close(self) -> None:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
index 1d890b9567f6..541ccf14b06d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
@@ -650,6 +650,9 @@ class TransformWithStateInPySparkStateServer(
           mapStateInfo.keyDeserializer)
         mapStateInfo.mapState.removeKey(keyRow)
         sendResponse(0)
+      case MapStateCall.MethodCase.CLEAR =>
+        mapStateInfo.mapState.clear()
+        sendResponse(0)
       case _ =>
         throw new IllegalArgumentException("Invalid method call")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to