This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6eb966812b32 [SPARK-51889][PYTHON][SS] Fix a bug for MapState clear() in Python TWS 6eb966812b32 is described below commit 6eb966812b328a29ccc6ac1531d0a4258e6c90c7 Author: bogao007 <bo....@databricks.com> AuthorDate: Thu Apr 24 14:43:43 2025 +0900 [SPARK-51889][PYTHON][SS] Fix a bug for MapState clear() in Python TWS ### What changes were proposed in this pull request? Fix a bug for MapState clear() in Python TWS ### Why are the changes needed? Without this change, MapState clear() would not work in Python. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50686 from bogao007/mapstate-clear. Authored-by: bogao007 <bo....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/tests/pandas/helper/helper_pandas_transform_with_state.py | 6 ++++++ .../python/streaming/TransformWithStateInPySparkStateServer.scala | 3 +++ 2 files changed, 9 insertions(+) diff --git a/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py index c376f0fb6836..53f6d77567ae 100644 --- a/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py @@ -972,6 +972,9 @@ class PandasMapStateProcessor(StatefulProcessor): assert next(map_iter)[1] == (value2,) self.map_state.removeKey(key1) assert not self.map_state.containsKey(key1) + assert self.map_state.exists() + self.map_state.clear() + assert not self.map_state.exists() yield pd.DataFrame({"id": key, "countAsString": str(count)}) def close(self) -> None: @@ -1010,6 +1013,9 @@ class RowMapStateProcessor(StatefulProcessor): assert next(map_iter)[1] == (value2,) self.map_state.removeKey(key1) assert not self.map_state.containsKey(key1) + assert self.map_state.exists() + self.map_state.clear() + assert not self.map_state.exists() yield Row(id=key[0], countAsString=str(count)) def close(self) -> None: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala index 1d890b9567f6..541ccf14b06d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala @@ -650,6 +650,9 @@ class TransformWithStateInPySparkStateServer( mapStateInfo.keyDeserializer) mapStateInfo.mapState.removeKey(keyRow) sendResponse(0) + case MapStateCall.MethodCase.CLEAR => + mapStateInfo.mapState.clear() + sendResponse(0) case _ => throw new IllegalArgumentException("Invalid method call") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org