This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 10e600f04ae [SPARK-40077][PYTHON][DOCS] Make pyspark.context examples
self-contained
10e600f04ae is described below
commit 10e600f04ae194b7857c9822926784b88b160f10
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Aug 16 09:37:28 2022 +0900
[SPARK-40077][PYTHON][DOCS] Make pyspark.context examples self-contained
### What changes were proposed in this pull request?
1, Make pyspark.context examples self-contained
2, add missing `versionadded` comments
3, add `see-also` sections
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly
in PySpark shell.
### Does this PR introduce _any_ user-facing change?
documents were changed
### How was this patch tested?
- added doctests.
- manually copy-paste test the example in PySpark Shell.
- build the documents and manually checks
Closes #37517 from zhengruifeng/py_doc_sc_self_contained.
Lead-authored-by: Ruifeng Zheng <[email protected]>
Co-authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/context.py | 1028 +++++++++++++++++++++++++++++++++++++++------
1 file changed, 903 insertions(+), 125 deletions(-)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 032efaef492..67ac8c720cd 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -45,7 +45,7 @@ from typing import (
from py4j.java_collections import JavaMap
from py4j.protocol import Py4JError
-from pyspark import accumulators, since
+from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast, BroadcastPickleRegistry
from pyspark.conf import SparkConf
@@ -112,26 +112,24 @@ class SparkContext:
environment : dict, optional
A dictionary of environment variables to set on
worker nodes.
- batchSize : int, optional
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. Set 1 to disable batching, 0 to automatically choose
the batch size based on object sizes, or -1 to use an unlimited
batch size
- serializer : :class:`pyspark.serializers.Serializer`, optional
+ serializer : :class:`Serializer`, optional, default
:class:`CPickleSerializer`
The serializer for RDDs.
- conf : :py:class:`pyspark.SparkConf`, optional
+ conf : :class:`SparkConf`, optional
An object setting Spark properties.
- gateway : :py:class:`py4j.java_gateway.JavaGateway`, optional
+ gateway : class:`py4j.java_gateway.JavaGateway`, optional
Use an existing gateway and JVM, otherwise a new JVM
will be instantiated. This is only used internally.
- jsc : :py:class:`py4j.java_gateway.JavaObject`, optional
+ jsc : class:`py4j.java_gateway.JavaObject`, optional
The JavaSparkContext instance. This is only used internally.
- profiler_cls : type, optional
+ profiler_cls : type, optional, default :class:`BasicProfiler`
A class of custom Profiler used to do profiling
- (default is :class:`pyspark.profiler.BasicProfiler`).
- udf_profiler_cls : type, optional
+ udf_profiler_cls : type, optional, default :class:`UDFBasicProfiler`
A class of custom Profiler used to do udf profiling
- (default is :class:`pyspark.profiler.UDFBasicProfiler`).
Notes
-----
@@ -477,11 +475,25 @@ class SparkContext:
@classmethod
def getOrCreate(cls, conf: Optional[SparkConf] = None) -> "SparkContext":
"""
- Get or instantiate a SparkContext and register it as a singleton
object.
+ Get or instantiate a :class:`SparkContext` and register it as a
singleton object.
+
+ .. versionadded:: 1.4.0
Parameters
----------
- conf : :py:class:`pyspark.SparkConf`, optional
+ conf : :class:`SparkConf`, optional
+ :class:`SparkConf` that will be used for initialization of the
:class:`SparkContext`.
+
+ Returns
+ -------
+ :class:`SparkContext`
+ current :class:`SparkContext`, or a new one if it wasn't created
before the function
+ call.
+
+ Examples
+ --------
+ >>> SparkContext.getOrCreate()
+ <SparkContext ...>
"""
with SparkContext._lock:
if SparkContext._active_spark_context is None:
@@ -493,14 +505,34 @@ class SparkContext:
"""
Control our logLevel. This overrides any user-defined log settings.
Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE,
WARN
+
+ .. versionadded:: 1.4.0
+
+ Parameters
+ ----------
+ logLevel : str
+ The desired log level as a string.
+
+ Examples
+ --------
+ >>> sc.setLogLevel("WARN") # doctest :+SKIP
"""
self._jsc.setLogLevel(logLevel)
@classmethod
def setSystemProperty(cls, key: str, value: str) -> None:
"""
- Set a Java system property, such as spark.executor.memory. This must
- must be invoked before instantiating SparkContext.
+ Set a Java system property, such as `spark.executor.memory`. This must
+ be invoked before instantiating :class:`SparkContext`.
+
+ .. versionadded:: 0.9.0
+
+ Parameters
+ ----------
+ key : str
+ The key of a new Java system property.
+ value : str
+ The value of a new Java system property.
"""
SparkContext._ensure_initialized()
assert SparkContext._jvm is not None
@@ -510,6 +542,12 @@ class SparkContext:
def version(self) -> str:
"""
The version of Spark on which this application is running.
+
+ .. versionadded:: 1.1.0
+
+ Examples
+ --------
+ >>> _ = sc.version
"""
return self._jsc.version()
@@ -522,6 +560,8 @@ class SparkContext:
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
+ .. versionadded:: 1.5.0
+
Examples
--------
>>> sc.applicationId # doctest: +ELLIPSIS
@@ -531,19 +571,40 @@ class SparkContext:
@property
def uiWebUrl(self) -> str:
- """Return the URL of the SparkUI instance started by this
SparkContext"""
+ """Return the URL of the SparkUI instance started by this
:class:`SparkContext`
+
+ .. versionadded:: 2.1.0
+
+ Examples
+ --------
+ >>> sc.uiWebUrl
+ 'http://...'
+ """
return self._jsc.sc().uiWebUrl().get()
@property
def startTime(self) -> int:
- """Return the epoch time when the Spark Context was started."""
+ """Return the epoch time when the :class:`SparkContext` was started.
+
+ .. versionadded:: 1.5.0
+
+ Examples
+ --------
+ >>> _ = sc.startTime
+ """
return self._jsc.startTime()
@property
def defaultParallelism(self) -> int:
"""
- Default level of parallelism to use when not given by user (e.g. for
- reduce tasks)
+ Default level of parallelism to use when not given by user (e.g. for
reduce tasks)
+
+ .. versionadded:: 0.7.0
+
+ Examples
+ --------
+ >>> sc.defaultParallelism > 0
+ True
"""
return self._jsc.sc().defaultParallelism()
@@ -551,12 +612,21 @@ class SparkContext:
def defaultMinPartitions(self) -> int:
"""
Default min number of partitions for Hadoop RDDs when not given by user
+
+ .. versionadded:: 1.1.0
+
+ Examples
+ --------
+ >>> sc.defaultMinPartitions > 0
+ True
"""
return self._jsc.sc().defaultMinPartitions()
def stop(self) -> None:
"""
- Shut down the SparkContext.
+ Shut down the :class:`SparkContext`.
+
+ .. versionadded:: 0.7.0
"""
if getattr(self, "_jsc", None):
try:
@@ -579,7 +649,21 @@ class SparkContext:
def emptyRDD(self) -> RDD[Any]:
"""
- Create an RDD that has no partitions or elements.
+ Create an :class:`RDD` that has no partitions or elements.
+
+ .. versionadded:: 1.5.0
+
+ Returns
+ -------
+ :class:`RDD`
+ An empty RDD
+
+ Examples
+ --------
+ >>> sc.emptyRDD()
+ EmptyRDD...
+ >>> sc.emptyRDD().count()
+ 0
"""
return RDD(self._jsc.emptyRDD(), self, NoOpSerializer())
@@ -592,22 +676,28 @@ class SparkContext:
way as python's built-in range() function. If called with a single
argument,
the argument is interpreted as `end`, and `start` is set to 0.
+ .. versionadded:: 1.5.0
+
Parameters
----------
start : int
the start value
end : int, optional
the end value (exclusive)
- step : int, optional
- the incremental step (default: 1)
+ step : int, optional, default 1
+ the incremental step
numSlices : int, optional
the number of partitions of the new RDD
Returns
-------
- :py:class:`pyspark.RDD`
+ :class:`RDD`
An RDD of int
+ See Also
+ --------
+ :meth:`pyspark.sql.SparkSession.range`
+
Examples
--------
>>> sc.range(5).collect()
@@ -616,6 +706,20 @@ class SparkContext:
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]
+
+ Generate RDD with a negative step
+
+ >>> sc.range(5, 0, -1).collect()
+ [5, 4, 3, 2, 1]
+ >>> sc.range(0, 5, -1).collect()
+ []
+
+ Control the number of partitions
+
+ >>> sc.range(5, numSlices=1).getNumPartitions()
+ 1
+ >>> sc.range(5, numSlices=10).getNumPartitions()
+ 10
"""
if end is None:
end = start
@@ -628,12 +732,32 @@ class SparkContext:
Distribute a local Python collection to form an RDD. Using range
is recommended if the input represents a range for performance.
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ c : :class:`collections.abc.Iterable`
+ iterable collection to distribute
+ numSlices : int, optional
+ the number of partitions of the new RDD
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD representing distributed collection.
+
Examples
--------
>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
+
+ Deal with a list of strings.
+
+ >>> strings = ["a", "b", "c"]
+ >>> sc.parallelize(strings, 2).glom().collect()
+ [['a'], ['b', 'c']]
"""
numSlices = int(numSlices) if numSlices is not None else
self.defaultParallelism
if isinstance(c, range):
@@ -694,7 +818,7 @@ class SparkContext:
--------
data
object to be serialized
- serializer : :py:class:`pyspark.serializers.Serializer`
+ serializer : class:`pyspark.serializers.Serializer`
reader_func : function
A function which takes a filename and reads in the data in the jvm
and
returns a JavaRDD. Only used when encryption is disabled.
@@ -731,13 +855,51 @@ class SparkContext:
"""
Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method.
+ .. versionadded:: 1.1.0
+
+ Parameters
+ ----------
+ name : str
+ directory to the input data files, the path can be comma separated
+ paths as a list of inputs
+ minPartitions : int, optional
+ suggested minimum number of partitions for the resulting RDD
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD representing unpickled data from the file(s).
+
+ See Also
+ --------
+ :meth:`RDD.saveAsPickleFile`
+
Examples
--------
- >>> tmpFile = NamedTemporaryFile(delete=True)
- >>> tmpFile.close()
- >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
- >>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a temporary pickled file
+ ... path1 = os.path.join(d, "pickled1")
+ ... sc.parallelize(range(10)).saveAsPickleFile(path1, 3)
+ ...
+ ... # Write another temporary pickled file
+ ... path2 = os.path.join(d, "pickled2")
+ ... sc.parallelize(range(-10, -5)).saveAsPickleFile(path2, 3)
+ ...
+ ... # Load picked file
+ ... collected1 = sorted(sc.pickleFile(path1, 3).collect())
+ ... collected2 = sorted(sc.pickleFile(path2, 4).collect())
+ ...
+ ... # Load two picked files together
+ ... collected3 = sorted(sc.pickleFile('{},{}'.format(path1,
path2), 5).collect())
+
+ >>> collected1
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
+ >>> collected2
+ [-10, -9, -8, -7, -6]
+ >>> collected3
+ [-10, -9, -8, -7, -6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.objectFile(name, minPartitions), self)
@@ -748,21 +910,60 @@ class SparkContext:
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
- RDD of Strings.
- The text files must be encoded as UTF-8.
+ RDD of Strings. The text files must be encoded as UTF-8.
- If use_unicode is False, the strings will be kept as `str` (encoding
- as `utf-8`), which is faster and smaller than unicode. (Added in
- Spark 1.2)
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ name : str
+ directory to the input data files, the path can be comma separated
+ paths as a list of inputs
+ minPartitions : int, optional
+ suggested minimum number of partitions for the resulting RDD
+ use_unicode : bool, default True
+ If `use_unicode` is False, the strings will be kept as `str`
(encoding
+ as `utf-8`), which is faster and smaller than unicode.
+
+ .. versionadded:: 1.2.0
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD representing text data from the file(s).
+
+ See Also
+ --------
+ :meth:`RDD.saveAsTextFile`
+ :meth:`SparkContext.wholeTextFiles`
Examples
--------
- >>> path = os.path.join(tempdir, "sample-text.txt")
- >>> with open(path, "w") as testFile:
- ... _ = testFile.write("Hello world!")
- >>> textFile = sc.textFile(path)
- >>> textFile.collect()
- ['Hello world!']
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path1 = os.path.join(d, "text1")
+ ... path2 = os.path.join(d, "text2")
+ ...
+ ... # Write a temporary text file
+ ... sc.parallelize(["x", "y", "z"]).saveAsTextFile(path1)
+ ...
+ ... # Write another temporary text file
+ ... sc.parallelize(["aa", "bb", "cc"]).saveAsTextFile(path2)
+ ...
+ ... # Load text file
+ ... collected1 = sorted(sc.textFile(path1, 3).collect())
+ ... collected2 = sorted(sc.textFile(path2, 4).collect())
+ ...
+ ... # Load two text files together
+ ... collected3 = sorted(sc.textFile('{},{}'.format(path1, path2),
5).collect())
+
+ >>> collected1
+ ['x', 'y', 'z']
+ >>> collected2
+ ['aa', 'bb', 'cc']
+ >>> collected3
+ ['aa', 'bb', 'cc', 'x', 'y', 'z']
"""
minPartitions = minPartitions or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer(use_unicode))
@@ -778,9 +979,7 @@ class SparkContext:
value is the content of each file.
The text files must be encoded as UTF-8.
- If `use_unicode` is False, the strings will be kept as `str` (encoding
- as `utf-8`), which is faster and smaller than unicode. (Added in
- Spark 1.2)
+ .. versionadded:: 1.0.0
For example, if you have the following files:
@@ -801,21 +1000,49 @@ class SparkContext:
...
(a-hdfs-path/part-nnnnn, its content)
+ Parameters
+ ----------
+ path : str
+ directory to the input data files, the path can be comma separated
+ paths as a list of inputs
+ minPartitions : int, optional
+ suggested minimum number of partitions for the resulting RDD
+ use_unicode : bool, default True
+ If `use_unicode` is False, the strings will be kept as `str`
(encoding
+ as `utf-8`), which is faster and smaller than unicode.
+
+ .. versionadded:: 1.2.0
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD representing path-content pairs from the file(s).
+
Notes
-----
Small files are preferred, as each file will be loaded fully in memory.
+ See Also
+ --------
+ :meth:`RDD.saveAsTextFile`
+ :meth:`SparkContext.textFile`
+
Examples
--------
- >>> dirPath = os.path.join(tempdir, "files")
- >>> os.mkdir(dirPath)
- >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
- ... _ = file1.write("1")
- >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
- ... _ = file2.write("2")
- >>> textFiles = sc.wholeTextFiles(dirPath)
- >>> sorted(textFiles.collect())
- [('.../1.txt', '1'), ('.../2.txt', '2')]
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a temporary text file
+ ... with open(os.path.join(d, "1.txt"), "w") as f:
+ ... _ = f.write("123")
+ ...
+ ... # Write another temporary text file
+ ... with open(os.path.join(d, "2.txt"), "w") as f:
+ ... _ = f.write("xyz")
+ ...
+ ... collected = sorted(sc.wholeTextFiles(d).collect())
+ >>> collected
+ [('.../1.txt', '123'), ('.../2.txt', 'xyz')]
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(
@@ -832,9 +1059,46 @@ class SparkContext:
in a key-value pair, where the key is the path of each file, the
value is the content of each file.
+ .. versionadded:: 1.3.0
+
+ Parameters
+ ----------
+ path : str
+ directory to the input data files, the path can be comma separated
+ paths as a list of inputs
+ minPartitions : int, optional
+ suggested minimum number of partitions for the resulting RDD
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD representing path-content pairs from the file(s).
+
Notes
-----
Small files are preferred, large file is also allowable, but may cause
bad performance.
+
+ See Also
+ --------
+ :meth:`SparkContext.binaryRecords`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a temporary binary file
+ ... with open(os.path.join(d, "1.bin"), "wb") as f1:
+ ... _ = f1.write(b"binary data I")
+ ...
+ ... # Write another temporary binary file
+ ... with open(os.path.join(d, "2.bin"), "wb") as f2:
+ ... _ = f2.write(b"binary data II")
+ ...
+ ... collected = sorted(sc.binaryFiles(d).collect())
+
+ >>> collected
+ [('.../1.bin', b'binary data I'), ('.../2.bin', b'binary data II')]
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(
@@ -849,12 +1113,43 @@ class SparkContext:
with the specified numerical format (see ByteBuffer), and the number of
bytes per record is constant.
+ .. versionadded:: 1.3.0
+
Parameters
----------
path : str
Directory to the input data files
recordLength : int
The length at which to split the records
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of data with values, represented as byte arrays
+
+ See Also
+ --------
+ :meth:`SparkContext.binaryFiles`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a temporary file
+ ... with open(os.path.join(d, "1.bin"), "w") as f:
+ ... for i in range(3):
+ ... _ = f.write("%04d" % i)
+ ...
+ ... # Write another file
+ ... with open(os.path.join(d, "2.bin"), "w") as f:
+ ... for i in [-1, -2, -10]:
+ ... _ = f.write("%04d" % i)
+ ...
+ ... collected = sorted(sc.binaryRecords(d, 4).collect())
+
+ >>> collected
+ [b'-001', b'-002', b'-010', b'0000', b'0001', b'0002']
"""
return RDD(self._jsc.binaryRecords(path, recordLength), self,
NoOpSerializer())
@@ -888,6 +1183,8 @@ class SparkContext:
3. If this fails, the fallback is to call 'toString' on each key
and value
4. :class:`CPickleSerializer` is used to deserialize pickled
objects on the Python side
+ .. versionadded:: 1.3.0
+
Parameters
----------
path : str
@@ -903,9 +1200,43 @@ class SparkContext:
fully qualifiedname of a function returning value WritableConverter
minSplits : int, optional
minimum splits in dataset (default min(2, sc.defaultParallelism))
- batchSize : int, optional
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. (default 0, choose batchSize automatically)
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of tuples of key and corresponding value
+
+ See Also
+ --------
+ :meth:`RDD.saveAsSequenceFile`
+ :meth:`RDD.saveAsNewAPIHadoopFile`
+ :meth:`RDD.saveAsHadoopFile`
+ :meth:`SparkContext.newAPIHadoopFile`
+ :meth:`SparkContext.hadoopFile`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ Set the class of output format
+
+ >>> output_format_class =
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "hadoop_file")
+ ...
+ ... # Write a temporary Hadoop file
+ ... rdd = sc.parallelize([(1, {3.0: "bb"}), (2, {1.0: "aa"}), (3,
{2.0: "dd"})])
+ ... rdd.saveAsNewAPIHadoopFile(path, output_format_class)
+ ...
+ ... collected = sorted(sc.sequenceFile(path).collect())
+
+ >>> collected
+ [(1, {3.0: 'bb'}), (2, {1.0: 'aa'}), (3, {2.0: 'dd'})]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
assert self._jvm is not None
@@ -935,11 +1266,13 @@ class SparkContext:
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class
from HDFS,
a local file system (available on all nodes), or any Hadoop-supported
file system URI.
- The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`.
+ The mechanism is the same as for meth:`SparkContext.sequenceFile`.
A Hadoop configuration can be passed in as a Python dict. This will be
converted into a
Configuration in Java
+ .. versionadded:: 1.1.0
+
Parameters
----------
path : str
@@ -962,9 +1295,47 @@ class SparkContext:
conf : dict, optional
Hadoop configuration, passed in as a dict
None by default
- batchSize : int, optional
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. (default 0, choose batchSize automatically)
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of tuples of key and corresponding value
+
+ See Also
+ --------
+ :meth:`RDD.saveAsSequenceFile`
+ :meth:`RDD.saveAsNewAPIHadoopFile`
+ :meth:`RDD.saveAsHadoopFile`
+ :meth:`SparkContext.sequenceFile`
+ :meth:`SparkContext.hadoopFile`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ Set the related classes
+
+ >>> output_format_class =
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
+ >>> input_format_class =
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
+ >>> key_class = "org.apache.hadoop.io.IntWritable"
+ >>> value_class = "org.apache.hadoop.io.Text"
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "new_hadoop_file")
+ ...
+ ... # Write a temporary Hadoop file
+ ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
+ ... rdd.saveAsNewAPIHadoopFile(path, output_format_class,
key_class, value_class)
+ ...
+ ... loaded = sc.newAPIHadoopFile(path, input_format_class,
key_class, value_class)
+ ... collected = sorted(loaded.collect())
+
+ >>> collected
+ [(1, ''), (1, 'a'), (3, 'x')]
"""
jconf = self._dictToJavaMap(conf)
assert self._jvm is not None
@@ -995,7 +1366,9 @@ class SparkContext:
Read a 'new API' Hadoop InputFormat with arbitrary key and value
class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
This will be converted into a Configuration in Java.
- The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`.
+ The mechanism is the same as for meth:`SparkContext.sequenceFile`.
+
+ .. versionadded:: 1.1.0
Parameters
----------
@@ -1015,9 +1388,58 @@ class SparkContext:
(None by default)
conf : dict, optional
Hadoop configuration, passed in as a dict (None by default)
- batchSize : int, optional
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. (default 0, choose batchSize automatically)
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of tuples of key and corresponding value
+
+ See Also
+ --------
+ :meth:`RDD.saveAsNewAPIHadoopDataset`
+ :meth:`RDD.saveAsHadoopDataset`
+ :meth:`SparkContext.hadoopRDD`
+ :meth:`SparkContext.hadoopFile`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ Set the related classes
+
+ >>> output_format_class =
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
+ >>> input_format_class =
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
+ >>> key_class = "org.apache.hadoop.io.IntWritable"
+ >>> value_class = "org.apache.hadoop.io.Text"
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "new_hadoop_file")
+ ...
+ ... # Create the conf for writing
+ ... write_conf = {
+ ... "mapreduce.job.outputformat.class": (output_format_class),
+ ... "mapreduce.job.output.key.class": key_class,
+ ... "mapreduce.job.output.value.class": value_class,
+ ... "mapreduce.output.fileoutputformat.outputdir": path,
+ ... }
+ ...
+ ... # Write a temporary Hadoop file
+ ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
+ ... rdd.saveAsNewAPIHadoopDataset(conf=write_conf)
+ ...
+ ... # Create the conf for reading
+ ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
+ ...
+ ... loaded = sc.newAPIHadoopRDD(input_format_class,
+ ... key_class, value_class, conf=read_conf)
+ ... collected = sorted(loaded.collect())
+
+ >>> collected
+ [(1, ''), (1, 'a'), (3, 'x')]
"""
jconf = self._dictToJavaMap(conf)
assert self._jvm is not None
@@ -1047,11 +1469,15 @@ class SparkContext:
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class
from HDFS,
a local file system (available on all nodes), or any Hadoop-supported
file system URI.
- The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`.
+ The mechanism is the same as for meth:`SparkContext.sequenceFile`.
+
+ .. versionadded:: 1.1.0
A Hadoop configuration can be passed in as a Python dict. This will be
converted into a
Configuration in Java.
+ Parameters
+ ----------
path : str
path to Hadoop file
inputFormatClass : str
@@ -1064,15 +1490,51 @@ class SparkContext:
(e.g. "org.apache.hadoop.io.LongWritable")
keyConverter : str, optional
fully qualified name of a function returning key WritableConverter
- (None by default)
valueConverter : str, optional
fully qualified name of a function returning value
WritableConverter
- (None by default)
conf : dict, optional
- Hadoop configuration, passed in as a dict (None by default)
- batchSize : int, optional
+ Hadoop configuration, passed in as a dict
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. (default 0, choose batchSize automatically)
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of tuples of key and corresponding value
+
+ See Also
+ --------
+ :meth:`RDD.saveAsSequenceFile`
+ :meth:`RDD.saveAsNewAPIHadoopFile`
+ :meth:`RDD.saveAsHadoopFile`
+ :meth:`SparkContext.newAPIHadoopFile`
+ :meth:`SparkContext.hadoopRDD`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ Set the related classes
+
+ >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
+ >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
+ >>> key_class = "org.apache.hadoop.io.IntWritable"
+ >>> value_class = "org.apache.hadoop.io.Text"
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "old_hadoop_file")
+ ...
+ ... # Write a temporary Hadoop file
+ ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
+ ... rdd.saveAsHadoopFile(path, output_format_class, key_class,
value_class)
+ ...
+ ... loaded = sc.hadoopFile(path, input_format_class, key_class,
value_class)
+ ... collected = sorted(loaded.collect())
+
+ >>> collected
+ [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')]
"""
jconf = self._dictToJavaMap(conf)
assert self._jvm is not None
@@ -1103,7 +1565,9 @@ class SparkContext:
Read an 'old' Hadoop InputFormat with arbitrary key and value class,
from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
This will be converted into a Configuration in Java.
- The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`.
+ The mechanism is the same as for meth:`SparkContext.sequenceFile`.
+
+ .. versionadded:: 1.1.0
Parameters
----------
@@ -1117,15 +1581,61 @@ class SparkContext:
(e.g. "org.apache.hadoop.io.LongWritable")
keyConverter : str, optional
fully qualified name of a function returning key WritableConverter
- (None by default)
valueConverter : str, optional
fully qualified name of a function returning value
WritableConverter
- (None by default)
conf : dict, optional
- Hadoop configuration, passed in as a dict (None by default)
- batchSize : int, optional
+ Hadoop configuration, passed in as a dict
+ batchSize : int, optional, default 0
The number of Python objects represented as a single
Java object. (default 0, choose batchSize automatically)
+
+ Returns
+ -------
+ :class:`RDD`
+ RDD of tuples of key and corresponding value
+
+ See Also
+ --------
+ :meth:`RDD.saveAsNewAPIHadoopDataset`
+ :meth:`RDD.saveAsHadoopDataset`
+ :meth:`SparkContext.newAPIHadoopRDD`
+ :meth:`SparkContext.hadoopFile`
+
+ Examples
+ --------
+ >>> import os
+ >>> import tempfile
+
+ Set the related classes
+
+ >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
+ >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
+ >>> key_class = "org.apache.hadoop.io.IntWritable"
+ >>> value_class = "org.apache.hadoop.io.Text"
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "old_hadoop_file")
+ ...
+ ... # Create the conf for writing
+ ... write_conf = {
+ ... "mapred.output.format.class": output_format_class,
+ ... "mapreduce.job.output.key.class": key_class,
+ ... "mapreduce.job.output.value.class": value_class,
+ ... "mapreduce.output.fileoutputformat.outputdir": path,
+ ... }
+ ...
+ ... # Write a temporary Hadoop file
+ ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
+ ... rdd.saveAsHadoopDataset(conf=write_conf)
+ ...
+ ... # Create the conf for reading
+ ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
+ ...
+ ... loaded = sc.hadoopRDD(input_format_class, key_class,
value_class, conf=read_conf)
+ ... collected = sorted(loaded.collect())
+
+ >>> collected
+ [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')]
"""
jconf = self._dictToJavaMap(conf)
assert self._jvm is not None
@@ -1153,16 +1663,28 @@ class SparkContext:
although this forces them to be reserialized using the default
serializer:
+ .. versionadded:: 0.7.0
+
+ See Also
+ --------
+ :meth:`RDD.union`
+
Examples
--------
- >>> path = os.path.join(tempdir, "union-text.txt")
- >>> with open(path, "w") as testFile:
- ... _ = testFile.write("Hello")
- >>> textFile = sc.textFile(path)
- >>> textFile.collect()
- ['Hello']
- >>> parallelized = sc.parallelize(["World!"])
- >>> sorted(sc.union([textFile, parallelized]).collect())
+ >>> import os
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # generate a text RDD
+ ... with open(os.path.join(d, "union-text.txt"), "w") as f:
+ ... _ = f.write("Hello")
+ ... text_rdd = sc.textFile(d)
+ ...
+ ... # generate another RDD
+ ... parallelized = sc.parallelize(["World!"])
+ ...
+ ... unioned = sorted(sc.union([text_rdd, parallelized]).collect())
+
+ >>> unioned
['Hello', 'World!']
"""
first_jrdd_deserializer = rdds[0]._jrdd_deserializer
@@ -1194,6 +1716,30 @@ class SparkContext:
Broadcast a read-only variable to the cluster, returning a
:class:`Broadcast`
object for reading it in distributed functions. The variable will
be sent to each cluster only once.
+
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ value : T
+ value to broadcast to the Spark nodes
+
+ Returns
+ -------
+ :class:`Broadcast`
+ :class:`Broadcast` object, a read-only variable cached on each
machine
+
+ Examples
+ --------
+ >>> mapping = {1: 10001, 2: 10002}
+ >>> bc = sc.broadcast(mapping)
+
+ >>> rdd = sc.range(5)
+ >>> rdd2 = rdd.map(lambda i: bc.value[i] if i in bc.value else -1)
+ >>> rdd2.collect()
+ [-1, 10001, 10002, -1, -1]
+
+ >>> bc.destroy()
"""
return Broadcast(self, value, self._pickled_broadcast_vars)
@@ -1206,6 +1752,39 @@ class SparkContext:
data type if provided. Default AccumulatorParams are used for integers
and floating-point numbers if you do not provide one. For other types,
a custom AccumulatorParam can be used.
+
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ value : T
+ initialized value
+ accum_param : :class:`pyspark.AccumulatorParam`, optional
+ helper object to define how to add values
+
+ Returns
+ -------
+ :class:`Accumulator`
+ `Accumulator` object, a shared variable that can be accumulated
+
+ Examples
+ --------
+ >>> acc = sc.accumulator(9)
+ >>> acc.value
+ 9
+ >>> acc += 1
+ >>> acc.value
+ 10
+
+ Accumulator object can be accumulated in RDD operations:
+
+ >>> rdd = sc.range(5)
+ >>> def f(x):
+ ... global acc
+ ... acc += 1
+ >>> rdd.foreach(f)
+ >>> acc.value
+ 15
"""
if accum_param is None:
if isinstance(value, int):
@@ -1232,31 +1811,67 @@ class SparkContext:
A directory can be given if the recursive option is set to True.
Currently directories are only supported for Hadoop-supported
filesystems.
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ path : str
+ can be either a local file, a file in HDFS (or other
Hadoop-supported
+ filesystems), or an HTTP, HTTPS or FTP URI. To access the file in
Spark jobs,
+ use :meth:`SparkFiles.get` to find its download location.
+ recursive : bool, default False
+ whether to recursively add files in the input directory
+
+ See Also
+ --------
+ :meth:`SparkContext.listFiles`
+ :meth:`SparkContext.addPyFile`
+ :meth:`SparkFiles.get`
+
Notes
-----
A path can be added only once. Subsequent additions of the same path
are ignored.
Examples
--------
+ >>> import os
+ >>> import tempfile
>>> from pyspark import SparkFiles
- >>> path = os.path.join(tempdir, "test.txt")
- >>> with open(path, "w") as testFile:
- ... _ = testFile.write("100")
- >>> sc.addFile(path)
- >>> def func(iterator):
- ... with open(SparkFiles.get("test.txt")) as testFile:
- ... fileVal = int(testFile.readline())
- ... return [x * fileVal for x in iterator]
- >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path1 = os.path.join(d, "test1.txt")
+ ... with open(path1, "w") as f:
+ ... _ = f.write("100")
+ ...
+ ... path2 = os.path.join(d, "test2.txt")
+ ... with open(path2, "w") as f:
+ ... _ = f.write("200")
+ ...
+ ... sc.addFile(path1)
+ ... file_list1 = sorted(sc.listFiles)
+ ...
+ ... sc.addFile(path2)
+ ... file_list2 = sorted(sc.listFiles)
+ ...
+ ... # add path2 twice, this addition will be ignored
+ ... sc.addFile(path2)
+ ... file_list3 = sorted(sc.listFiles)
+ ...
+ ... def func(iterator):
+ ... with open(SparkFiles.get("test1.txt")) as f:
+ ... mul = int(f.readline())
+ ... return [x * mul for x in iterator]
+ ...
+ ... collected = sc.parallelize([1, 2, 3,
4]).mapPartitions(func).collect()
+
+ >>> file_list1
+ ['file:/.../test1.txt']
+ >>> file_list2
+ ['file:/.../test1.txt', 'file:/.../test2.txt']
+ >>> file_list3
+ ['file:/.../test1.txt', 'file:/.../test2.txt']
+ >>> collected
[100, 200, 300, 400]
- >>> sc.listFiles
- ['file:/.../test.txt']
- >>> path2 = os.path.join(tempdir, "test2.txt")
- >>> with open(path2, "w") as testFile:
- ... _ = testFile.write("100")
- >>> sc.addFile(path2)
- >>> sorted(sc.listFiles)
- ['file:/.../test.txt', 'file:/.../test2.txt']
"""
self._jsc.sc().addFile(path, recursive)
@@ -1268,7 +1883,7 @@ class SparkContext:
See Also
--------
- SparkContext.addFile
+ :meth:`SparkContext.addFile`
"""
return list(
self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type:
ignore[union-attr]
@@ -1283,6 +1898,17 @@ class SparkContext:
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ path : str
+ can be either a .py file or .zip dependency.
+
+ See Also
+ --------
+ :meth:`SparkContext.addFile`
+
Notes
-----
A path can be added only once. Subsequent additions of the same path
are ignored.
@@ -1310,6 +1936,18 @@ class SparkContext:
.. versionadded:: 3.3.0
+ Parameters
+ ----------
+ path : str
+ can be either a local file, a file in HDFS (or other
Hadoop-supported
+ filesystems), or an HTTP, HTTPS or FTP URI. To access the file in
Spark jobs,
+ use :meth:`SparkFiles.get` to find its download location.
+
+ See Also
+ --------
+ :meth:`SparkContext.listArchives`
+ :meth:`SparkFiles.get`
+
Notes
-----
A path can be added only once. Subsequent additions of the same path
are ignored.
@@ -1319,34 +1957,48 @@ class SparkContext:
--------
Creates a zipped file that contains a text file written '100'.
+ >>> import os
+ >>> import tempfile
>>> import zipfile
>>> from pyspark import SparkFiles
- >>> path = os.path.join(tempdir, "test.txt")
- >>> zip_path = os.path.join(tempdir, "test.zip")
- >>> with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as
zipped:
- ... with open(path, "w") as f:
- ... _ = f.write("100")
- ... zipped.write(path, os.path.basename(path))
- >>> sc.addArchive(zip_path)
- >>> sc.listArchives
- ['file:/.../test.zip']
- >>> zip_path2 = os.path.join(tempdir, "test2.zip")
- >>> with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as
zipped:
+
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... path = os.path.join(d, "test.txt")
... with open(path, "w") as f:
... _ = f.write("100")
- ... zipped.write(path, os.path.basename(path))
- >>> sc.addArchive(zip_path2)
- >>> sorted(sc.listArchives)
- ['file:/.../test.zip', 'file:/.../test2.zip']
-
- Reads the '100' as an integer in the zipped file, and processes
- it with the data in the RDD.
-
- >>> def func(iterator):
- ... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f:
- ... v = int(f.readline())
- ... return [x * int(v) for x in iterator]
- >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
+ ...
+ ... zip_path1 = os.path.join(d, "test1.zip")
+ ... with zipfile.ZipFile(zip_path1, "w", zipfile.ZIP_DEFLATED) as
z:
+ ... z.write(path, os.path.basename(path))
+ ...
+ ... zip_path2 = os.path.join(d, "test2.zip")
+ ... with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as
z:
+ ... z.write(path, os.path.basename(path))
+ ...
+ ... sc.addArchive(zip_path1)
+ ... arch_list1 = sorted(sc.listArchives)
+ ...
+ ... sc.addArchive(zip_path2)
+ ... arch_list2 = sorted(sc.listArchives)
+ ...
+ ... # add zip_path2 twice, this addition will be ignored
+ ... sc.addArchive(zip_path2)
+ ... arch_list3 = sorted(sc.listArchives)
+ ...
+ ... def func(iterator):
+ ... with open("%s/test.txt" % SparkFiles.get("test1.zip")) as
f:
+ ... mul = int(f.readline())
+ ... return [x * mul for x in iterator]
+ ...
+ ... collected = sc.parallelize([1, 2, 3,
4]).mapPartitions(func).collect()
+
+ >>> arch_list1
+ ['file:/.../test1.zip']
+ >>> arch_list2
+ ['file:/.../test1.zip', 'file:/.../test2.zip']
+ >>> arch_list3
+ ['file:/.../test1.zip', 'file:/.../test2.zip']
+ >>> collected
[100, 200, 300, 400]
"""
self._jsc.sc().addArchive(path)
@@ -1359,7 +2011,7 @@ class SparkContext:
See Also
--------
- SparkContext.addArchive
+ :meth:`SparkContext.addArchive`
"""
return list(
self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type:
ignore[union-attr]
@@ -1371,14 +2023,31 @@ class SparkContext:
"""
Set the directory under which RDDs are going to be checkpointed. The
directory must be an HDFS path if running on a cluster.
+
+ .. versionadded:: 0.7.0
+
+ Parameters
+ ----------
+ dirName : str
+ path to the directory where checkpoint files will be stored
+ (must be HDFS path if running in cluster)
+
+ See Also
+ --------
+ :meth:`SparkContext.getCheckpointDir`
"""
self._jsc.sc().setCheckpointDir(dirName)
- @since(3.1)
def getCheckpointDir(self) -> Optional[str]:
"""
Return the directory where RDDs are checkpointed. Returns None if no
checkpoint directory has been set.
+
+ .. versionadded:: 3.1.0
+
+ See Also
+ --------
+ :meth:`SparkContext.setCheckpointDir`
"""
if not self._jsc.sc().getCheckpointDir().isEmpty():
return self._jsc.sc().getCheckpointDir().get()
@@ -1412,6 +2081,17 @@ class SparkContext:
The application can use :meth:`SparkContext.cancelJobGroup` to cancel
all
running jobs in this group.
+ .. versionadded:: 1.0.0
+
+ Parameters
+ ----------
+ groupId : str
+ The group ID to assign.
+ description : str
+ The description to set for the job group.
+ interruptOnCancel : bool, optional, default False
+ whether to interrupt jobs on job cancellation.
+
Notes
-----
If interruptOnCancel is set to true for the job group, then job
cancellation will result
@@ -1422,6 +2102,10 @@ class SparkContext:
If you run jobs in parallel, use :class:`pyspark.InheritableThread`
for thread
local inheritance.
+ See Also
+ --------
+ :meth:`SparkContext.cancelJobGroup`
+
Examples
--------
>>> import threading
@@ -1457,6 +2141,19 @@ class SparkContext:
Set a local property that affects jobs submitted from this thread,
such as the
Spark fair scheduler pool.
+ .. versionadded:: 1.0.0
+
+ Parameters
+ ----------
+ key : str
+ The key of the local property to set.
+ value : str
+ The value of the local property to set.
+
+ See Also
+ --------
+ :meth:`SparkContext.getLocalProperty`
+
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread`
for thread
@@ -1468,6 +2165,12 @@ class SparkContext:
"""
Get a local property set in this thread, or null if it is missing. See
:meth:`setLocalProperty`.
+
+ .. versionadded:: 1.0.0
+
+ See Also
+ --------
+ :meth:`SparkContext.setLocalProperty`
"""
return self._jsc.getLocalProperty(key)
@@ -1475,6 +2178,13 @@ class SparkContext:
"""
Set a human readable description of the current job.
+ .. versionadded:: 2.3.0
+
+ Parameters
+ ----------
+ value : str
+ The job description to set.
+
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread`
for thread
@@ -1485,6 +2195,8 @@ class SparkContext:
def sparkUser(self) -> str:
"""
Get SPARK_USER for user who is running SparkContext.
+
+ .. versionadded:: 1.0.0
"""
return self._jsc.sc().sparkUser()
@@ -1492,18 +2204,39 @@ class SparkContext:
"""
Cancel active jobs for the specified group. See
:meth:`SparkContext.setJobGroup`.
for more information.
+
+ .. versionadded:: 1.1.0
+
+ Parameters
+ ----------
+ groupId : str
+ The group ID to cancel the job.
+
+ See Also
+ --------
+ :meth:`SparkContext.setJobGroup`
+ :meth:`SparkContext.cancelJobGroup`
"""
self._jsc.sc().cancelJobGroup(groupId)
def cancelAllJobs(self) -> None:
"""
Cancel all jobs that have been scheduled or are running.
+
+ .. versionadded:: 1.1.0
+
+ See Also
+ --------
+ :meth:`SparkContext.cancelJobGroup`
+ :meth:`SparkContext.runJob`
"""
self._jsc.sc().cancelAllJobs()
def statusTracker(self) -> StatusTracker:
"""
Return :class:`StatusTracker` object
+
+ .. versionadded:: 1.4.0
"""
return StatusTracker(self._jsc.statusTracker())
@@ -1520,6 +2253,29 @@ class SparkContext:
If 'partitions' is not specified, this will run over all partitions.
+ .. versionadded:: 1.1.0
+
+ Parameters
+ ----------
+ rdd : :class:`RDD`
+ target RDD to run tasks on
+ partitionFunc : function
+ a function to run on each partition of the RDD
+ partitions : list, optional
+ set of partitions to run on; some jobs may not want to compute on
all
+ partitions of the target RDD, e.g. for operations like `first`
+ allowLocal : bool, default False
+ this parameter takes no effect
+
+ Returns
+ -------
+ list
+ results of specified partitions
+
+ See Also
+ --------
+ :meth:`SparkContext.cancelAllJobs`
+
Examples
--------
>>> myRDD = sc.parallelize(range(6), 3)
@@ -1542,7 +2298,14 @@ class SparkContext:
return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
def show_profiles(self) -> None:
- """Print the profile stats to stdout"""
+ """Print the profile stats to stdout
+
+ .. versionadded:: 1.2.0
+
+ See Also
+ --------
+ :meth:`SparkContext.dump_profiles`
+ """
if self.profiler_collector is not None:
self.profiler_collector.show_profiles()
else:
@@ -1552,7 +2315,14 @@ class SparkContext:
)
def dump_profiles(self, path: str) -> None:
- """Dump the profile stats into directory `path`"""
+ """Dump the profile stats into directory `path`
+
+ .. versionadded:: 1.2.0
+
+ See Also
+ --------
+ :meth:`SparkContext.show_profiles`
+ """
if self.profiler_collector is not None:
self.profiler_collector.dump_profiles(path)
else:
@@ -1562,12 +2332,22 @@ class SparkContext:
)
def getConf(self) -> SparkConf:
+ """Return a copy of this SparkContext's configuration
:class:`SparkConf`.
+
+ .. versionadded:: 2.1.0
+ """
conf = SparkConf()
conf.setAll(self._conf.getAll())
return conf
@property
def resources(self) -> Dict[str, ResourceInformation]:
+ """
+ Return the resource information of this :class:`SparkContext`.
+ A resource could be a GPU, FPGA, etc.
+
+ .. versionadded:: 3.0.0
+ """
resources = {}
jresources = self._jsc.resources()
for x in jresources:
@@ -1589,14 +2369,12 @@ class SparkContext:
def _test() -> None:
- import atexit
import doctest
- import tempfile
+ from pyspark import SparkConf
globs = globals().copy()
- globs["sc"] = SparkContext("local[4]", "PythonTest")
- globs["tempdir"] = tempfile.mkdtemp()
- atexit.register(lambda: shutil.rmtree(globs["tempdir"]))
+ conf = SparkConf().set("spark.ui.enabled", "True")
+ globs["sc"] = SparkContext("local[4]", "context tests", conf=conf)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs["sc"].stop()
if failure_count:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]