This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fe3d3ae6172b [SPARK-50719][PYTHON] Support `interruptOperation` for
PySpark
fe3d3ae6172b is described below
commit fe3d3ae6172b88734c33da30e900ed3bfae1417c
Author: Haejoon Lee <[email protected]>
AuthorDate: Fri Jan 10 12:23:10 2025 +0800
[SPARK-50719][PYTHON] Support `interruptOperation` for PySpark
### What changes were proposed in this pull request?
This PR proposes to support `interruptOperation` for PySpark
### Why are the changes needed?
For feature parity with Spark Connect
### Does this PR introduce _any_ user-facing change?
No, this adds a new API
### How was this patch tested?
The existing CI should pass
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49423 from itholic/interrupt_operation.
Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../docs/source/reference/pyspark.sql/spark_session.rst | 2 +-
python/pyspark/sql/session.py | 17 ++++++++++++-----
python/pyspark/sql/tests/test_connect_compatibility.py | 1 -
python/pyspark/sql/tests/test_session.py | 1 -
4 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst
b/python/docs/source/reference/pyspark.sql/spark_session.rst
index a35fccbcffe9..0d6a1bc79b90 100644
--- a/python/docs/source/reference/pyspark.sql/spark_session.rst
+++ b/python/docs/source/reference/pyspark.sql/spark_session.rst
@@ -53,6 +53,7 @@ See also :class:`SparkSession`.
SparkSession.getActiveSession
SparkSession.getTags
SparkSession.interruptAll
+ SparkSession.interruptOperation
SparkSession.interruptTag
SparkSession.newSession
SparkSession.profile
@@ -88,6 +89,5 @@ Spark Connect Only
SparkSession.clearProgressHandlers
SparkSession.client
SparkSession.copyFromLocalToFs
- SparkSession.interruptOperation
SparkSession.registerProgressHandler
SparkSession.removeProgressHandler
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc434cd16bfb..5ab186b2957e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -2253,13 +2253,15 @@ class SparkSession(SparkConversionMixin):
return python_list
- @remote_only
def interruptOperation(self, op_id: str) -> List[str]:
"""
Interrupt an operation of this session with the given operationId.
.. versionadded:: 3.5.0
+ .. versionchanged:: 4.0.0
+ Supports Spark Classic.
+
Returns
-------
list of str
@@ -2269,10 +2271,15 @@ class SparkSession(SparkConversionMixin):
-----
There is still a possibility of operation finishing just as it is
interrupted.
"""
- raise PySparkRuntimeError(
- errorClass="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
- messageParameters={"feature": "SparkSession.interruptOperation"},
- )
+ java_list = self._jsparkSession.interruptOperation(op_id)
+ python_list = list()
+
+ # Use iterator to manually iterate through Java list
+ java_iterator = java_list.iterator()
+ while java_iterator.hasNext():
+ python_list.append(str(java_iterator.next()))
+
+ return python_list
def addTag(self, tag: str) -> None:
"""
diff --git a/python/pyspark/sql/tests/test_connect_compatibility.py
b/python/pyspark/sql/tests/test_connect_compatibility.py
index 25b8be1f9ac7..4ac68292b402 100644
--- a/python/pyspark/sql/tests/test_connect_compatibility.py
+++ b/python/pyspark/sql/tests/test_connect_compatibility.py
@@ -266,7 +266,6 @@ class ConnectCompatibilityTestsMixin:
"addArtifacts",
"clearProgressHandlers",
"copyFromLocalToFs",
- "interruptOperation",
"newSession",
"registerProgressHandler",
"removeProgressHandler",
diff --git a/python/pyspark/sql/tests/test_session.py
b/python/pyspark/sql/tests/test_session.py
index a22fe777e3c9..c21247e3159c 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -227,7 +227,6 @@ class SparkSessionTests3(unittest.TestCase,
PySparkErrorTestUtils):
(lambda: session.client, "client"),
(session.addArtifacts, "addArtifact(s)"),
(lambda: session.copyFromLocalToFs("", ""),
"copyFromLocalToFs"),
- (lambda: session.interruptOperation(""), "interruptOperation"),
]
for func, name in unsupported:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]