This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 356623add8d [SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast
examples self-contained
356623add8d is described below
commit 356623add8d5fd47394b1018fe46fe3c0cb9f814
Author: Qian.Sun <[email protected]>
AuthorDate: Tue Aug 30 11:10:37 2022 +0900
[SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast examples self-contained
### What changes were proposed in this pull request?
This PR proposes to improve the examples in `pyspark.broadcast` by making
each example self-contained with a brief explanation and a bit more realistic
example.
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly
in PySpark shell.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation.
### How was this patch tested?
Manually ran each doctest.
Closes #37629 from dcoliversun/SPARK-40160.
Authored-by: Qian.Sun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/broadcast.py | 138 ++++++++++++++++++++++++++++++++++++++++----
1 file changed, 126 insertions(+), 12 deletions(-)
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index edd282de92f..c163ad2eb77 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -70,16 +70,14 @@ class Broadcast(Generic[T]):
Examples
--------
- >>> from pyspark.context import SparkContext
- >>> sc = SparkContext('local', 'test')
- >>> b = sc.broadcast([1, 2, 3, 4, 5])
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
- >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
+ >>> spark.sparkContext.parallelize([0, 0]).flatMap(lambda x:
b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
- >>> large_broadcast = sc.broadcast(range(10000))
+ >>> large_broadcast = spark.sparkContext.broadcast(range(10000))
"""
@overload # On driver
@@ -149,6 +147,32 @@ class Broadcast(Generic[T]):
self._path = path
def dump(self, value: T, f: BinaryIO) -> None:
+ """
+ Write a pickled representation of value to the open file or socket.
+ The protocol pickle is HIGHEST_PROTOCOL.
+
+ Parameters
+ ----------
+ value : T
+ Value to write.
+
+ f : :class:`BinaryIO`
+ File or socket where the pickled value will be stored.
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+ Write a pickled representation of `b` to the open temp file.
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "test.txt")
+ ... with open(path, "wb") as f:
+ ... b.dump(b.value, f)
+ """
try:
pickle.dump(value, f, pickle_protocol)
except pickle.PickleError:
@@ -160,11 +184,74 @@ class Broadcast(Generic[T]):
f.close()
def load_from_path(self, path: str) -> T:
+ """
+ Read the pickled representation of an object from the open file and
+ return the reconstituted object hierarchy specified therein.
+
+ Parameters
+ ----------
+ path : str
+ File path where reads the pickled value.
+
+ Returns
+ -------
+ T
+ The object hierarchy specified therein reconstituted
+ from the pickled representation of an object.
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+ >>> c = spark.sparkContext.broadcast(1)
+
+ Read the pickled representation of value fron temp file.
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "test.txt")
+ ... with open(path, "wb") as f:
+ ... b.dump(b.value, f)
+ ... c.load_from_path(path)
+ [1, 2, 3, 4, 5]
+ """
with open(path, "rb", 1 << 20) as f:
return self.load(f)
def load(self, file: BinaryIO) -> T:
- # "file" could also be a socket
+ """
+ Read a pickled representation of value from the open file or socket.
+
+ Parameters
+ ----------
+ file : :class:`BinaryIO`
+ File or socket where the pickled value will be read.
+
+ Returns
+ -------
+ T
+ The object hierarchy specified therein reconstituted
+ from the pickled representation of an object.
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+ >>> c = spark.sparkContext.broadcast(1)
+
+ Read the pickled representation of value from the open temp file.
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "test.txt")
+ ... with open(path, "wb") as f:
+ ... b.dump(b.value, f)
+ ... with open(path, "rb") as f:
+ ... c.load(f)
+ [1, 2, 3, 4, 5]
+ """
gc.disable()
try:
return pickle.load(file)
@@ -194,8 +281,16 @@ class Broadcast(Generic[T]):
Parameters
----------
- blocking : bool, optional
- Whether to block until unpersisting has completed
+ blocking : bool, optional, default False
+ Whether to block until unpersisting has completed.
+
+ Examples
+ --------
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+ Delete cached copies of this broadcast on the executors
+
+ >>> b.unpersist()
"""
if self._jbroadcast is None:
raise RuntimeError("Broadcast can only be unpersisted in driver")
@@ -213,8 +308,16 @@ class Broadcast(Generic[T]):
Parameters
----------
- blocking : bool, optional
- Whether to block until unpersisting has completed
+ blocking : bool, optional, default False
+ Whether to block until unpersisting has completed.
+
+ Examples
+ --------
+ >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+ Destroy all data and metadata related to this broadcast variable
+
+ >>> b.destroy()
"""
if self._jbroadcast is None:
raise RuntimeError("Broadcast can only be destroyed in driver")
@@ -246,9 +349,20 @@ class BroadcastPickleRegistry(threading.local):
self._registry.clear()
-if __name__ == "__main__":
+def _test() -> None:
import doctest
+ from pyspark.sql import SparkSession
+ import pyspark.broadcast
+
+ globs = pyspark.broadcast.__dict__.copy()
+ spark = SparkSession.builder.master("local[4]").appName("broadcast
tests").getOrCreate()
+ globs["spark"] = spark
- (failure_count, test_count) = doctest.testmod()
+ (failure_count, test_count) = doctest.testmod(pyspark.broadcast,
globs=globs)
+ spark.stop()
if failure_count:
sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]