This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c11585ac296e [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils 
compatible with pyspark-connect
c11585ac296e is described below

commit c11585ac296eb726e6356bfcc7628a2c948e1d2f
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Sun Apr 7 18:11:12 2024 +0900

    [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with 
pyspark-connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to make `pyspark.worker_utils` compatible with 
`pyspark-connect`.
    
    ### Why are the changes needed?
    
    In order for `pyspark-connect` to work without classic PySpark packages and 
dependencies.
    Spark Connect does not support `Broadcast` and `Accumulator`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Yes, at https://github.com/apache/spark/pull/45870. Once CI is setup there, 
it will be tested there properly.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45914 from HyukjinKwon/SPARK-47751.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/worker_util.py | 31 ++++++++++++++++++++++---------
 1 file changed, 22 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py
index f3c59c91ea2c..22389decac2f 100644
--- a/python/pyspark/worker_util.py
+++ b/python/pyspark/worker_util.py
@@ -32,10 +32,8 @@ try:
 except ImportError:
     has_resource_module = False
 
-from pyspark.accumulators import _accumulatorRegistry
-from pyspark.core.broadcast import Broadcast, _broadcastRegistry
+from pyspark.util import is_remote_only
 from pyspark.errors import PySparkRuntimeError
-from pyspark.core.files import SparkFiles
 from pyspark.util import local_connect_and_auth
 from pyspark.serializers import (
     read_bool,
@@ -59,8 +57,11 @@ def add_path(path: str) -> None:
 
 
 def read_command(serializer: FramedSerializer, file: IO) -> Any:
+    if not is_remote_only():
+        from pyspark.core.broadcast import Broadcast
+
     command = serializer._read_with_length(file)
-    if isinstance(command, Broadcast):
+    if not is_remote_only() and isinstance(command, Broadcast):
         command = serializer.loads(command.value)
     return command
 
@@ -125,8 +126,12 @@ def setup_spark_files(infile: IO) -> None:
     """
     # fetch name of workdir
     spark_files_dir = utf8_deserializer.loads(infile)
-    SparkFiles._root_directory = spark_files_dir
-    SparkFiles._is_running_on_worker = True
+
+    if not is_remote_only():
+        from pyspark.core.files import SparkFiles
+
+        SparkFiles._root_directory = spark_files_dir
+        SparkFiles._is_running_on_worker = True
 
     # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
     add_path(spark_files_dir)  # *.py files that were added will be copied here
@@ -142,6 +147,9 @@ def setup_broadcasts(infile: IO) -> None:
     """
     Set up broadcasted variables.
     """
+    if not is_remote_only():
+        from pyspark.core.broadcast import Broadcast, _broadcastRegistry
+
     # fetch names and values of broadcast variables
     needs_broadcast_decryption_server = read_bool(infile)
     num_broadcast_variables = read_int(infile)
@@ -175,6 +183,11 @@ def send_accumulator_updates(outfile: IO) -> None:
     """
     Send the accumulator updates back to JVM.
     """
-    write_int(len(_accumulatorRegistry), outfile)
-    for aid, accum in _accumulatorRegistry.items():
-        pickleSer._write_with_length((aid, accum._value), outfile)
+    if not is_remote_only():
+        from pyspark.accumulators import _accumulatorRegistry
+
+        write_int(len(_accumulatorRegistry), outfile)
+        for aid, accum in _accumulatorRegistry.items():
+            pickleSer._write_with_length((aid, accum._value), outfile)
+    else:
+        write_int(0, outfile)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to